You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:44 UTC
[13/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountCombiner.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountCombiner.java b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountCombiner.java
new file mode 100644
index 0000000..4c63333
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountCombiner.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.term;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+/**
+ * @see TermCountReducer
+ */
+public class TermCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long sum = 0;
+ for (LongWritable value : values) {
+ sum += value.get();
+ }
+ context.write(key, new LongWritable(sum));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountMapper.java b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountMapper.java
new file mode 100644
index 0000000..9af3d57
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.term;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.math.function.ObjectLongProcedure;
+import org.apache.mahout.math.map.OpenObjectLongHashMap;
+
+/**
+ * TextVectorizer Term Count Mapper. Tokenizes a text document and outputs the count of the words
+ */
+public class TermCountMapper extends Mapper<Text, StringTuple, Text, LongWritable> {
+
+ @Override
+ protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
+ OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<>();
+ for (String word : value.getEntries()) {
+ if (wordCount.containsKey(word)) {
+ wordCount.put(word, wordCount.get(word) + 1);
+ } else {
+ wordCount.put(word, 1);
+ }
+ }
+ wordCount.forEachPair(new ObjectLongProcedure<String>() {
+ @Override
+ public boolean apply(String first, long second) {
+ try {
+ context.write(new Text(first), new LongWritable(second));
+ } catch (IOException e) {
+ context.getCounter("Exception", "Output IO Exception").increment(1);
+ } catch (InterruptedException e) {
+ context.getCounter("Exception", "Interrupted Exception").increment(1);
+ }
+ return true;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountReducer.java b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountReducer.java
new file mode 100644
index 0000000..388bfc2
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermCountReducer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.term;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.vectorizer.DictionaryVectorizer;
+
+/**
+ * This accumulates all the words and the weights and sums them up.
+ *
+ * @see TermCountCombiner
+ */
+public class TermCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
+
+ private int minSupport;
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long sum = 0;
+ for (LongWritable value : values) {
+ sum += value.get();
+ }
+ if (sum >= minSupport) {
+ context.write(key, new LongWritable(sum));
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ minSupport = context.getConfiguration().getInt(DictionaryVectorizer.MIN_SUPPORT,
+ DictionaryVectorizer.DEFAULT_MIN_SUPPORT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountMapper.java b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountMapper.java
new file mode 100644
index 0000000..30828bf
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountMapper.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.term;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * TextVectorizer Document Frequency Count Mapper. Outputs 1 for each feature
+ */
+public class TermDocumentCountMapper extends Mapper<WritableComparable<?>, VectorWritable, IntWritable, LongWritable> {
+
+ private static final LongWritable ONE = new LongWritable(1);
+
+ private static final IntWritable TOTAL_COUNT = new IntWritable(-1);
+
+ private final IntWritable out = new IntWritable();
+
+ @Override
+ protected void map(WritableComparable<?> key, VectorWritable value, Context context)
+ throws IOException, InterruptedException {
+ Vector vector = value.get();
+ for (Vector.Element e : vector.nonZeroes()) {
+ out.set(e.index());
+ context.write(out, ONE);
+ }
+ context.write(TOTAL_COUNT, ONE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountReducer.java b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountReducer.java
new file mode 100644
index 0000000..c815692
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/term/TermDocumentCountReducer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.term;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Can also be used as a local Combiner. This accumulates all the features and the weights and sums them up.
+ */
+public class TermDocumentCountReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
+
+ @Override
+ protected void reduce(IntWritable key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long sum = 0;
+ for (LongWritable value : values) {
+ sum += value.get();
+ }
+ context.write(key, new LongWritable(sum));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java b/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
new file mode 100644
index 0000000..5f9d666
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.tfidf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.vectorizer.common.PartialVectorMerger;
+import org.apache.mahout.vectorizer.term.TermDocumentCountMapper;
+import org.apache.mahout.vectorizer.term.TermDocumentCountReducer;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This class converts a set of input vectors with term frequencies to TfIdf vectors. The Sequence file input
+ * should have a {@link org.apache.hadoop.io.WritableComparable} key containing and a
+ * {@link VectorWritable} value containing the
+ * term frequency vector. This is conversion class uses multiple map/reduces to convert the vectors to TfIdf
+ * format
+ *
+ */
+public final class TFIDFConverter {
+
+ public static final String VECTOR_COUNT = "vector.count";
+ public static final String FEATURE_COUNT = "feature.count";
+ public static final String MIN_DF = "min.df";
+ public static final String MAX_DF = "max.df";
+ //public static final String TFIDF_OUTPUT_FOLDER = "tfidf";
+
+ private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tfidf-vectors";
+ public static final String FREQUENCY_FILE = "frequency.file-";
+ private static final int MAX_CHUNKSIZE = 10000;
+ private static final int MIN_CHUNKSIZE = 100;
+ private static final String OUTPUT_FILES_PATTERN = "part-*";
+ private static final int SEQUENCEFILE_BYTE_OVERHEAD = 45;
+ private static final String VECTOR_OUTPUT_FOLDER = "partial-vectors-";
+ public static final String WORDCOUNT_OUTPUT_FOLDER = "df-count";
+
+ /**
+ * Cannot be initialized. Use the static functions
+ */
+ private TFIDFConverter() {}
+
+ /**
+ * Create Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors from the input set of vectors in
+ * {@link SequenceFile} format. This job uses a fixed limit on the maximum memory used by the feature chunk
+ * per node thereby splitting the process across multiple map/reduces.
+ * Before using this method calculateDF should be called
+ *
+ * @param input
+ * input directory of the vectors in {@link SequenceFile} format
+ * @param output
+ * output directory where {@link org.apache.mahout.math.RandomAccessSparseVector}'s of the document
+ * are generated
+ * @param datasetFeatures
+ * Document frequencies information calculated by calculateDF
+ * @param minDf
+ * The minimum document frequency. Default 1
+ * @param maxDF
+ * The max percentage of vectors for the DF. Can be used to remove really high frequency features.
+ * Expressed as an integer between 0 and 100. Default 99
+ * @param numReducers
+ * The number of reducers to spawn. This also affects the possible parallelism since each reducer
+ * will typically produce a single output file containing tf-idf vectors for a subset of the
+ * documents in the corpus.
+ */
+ public static void processTfIdf(Path input,
+ Path output,
+ Configuration baseConf,
+ Pair<Long[], List<Path>> datasetFeatures,
+ int minDf,
+ long maxDF,
+ float normPower,
+ boolean logNormalize,
+ boolean sequentialAccessOutput,
+ boolean namedVector,
+ int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
+ Preconditions.checkArgument(normPower == PartialVectorMerger.NO_NORMALIZING || normPower >= 0,
+ "If specified normPower must be nonnegative", normPower);
+ Preconditions.checkArgument(normPower == PartialVectorMerger.NO_NORMALIZING
+ || (normPower > 1 && !Double.isInfinite(normPower))
+ || !logNormalize,
+ "normPower must be > 1 and not infinite if log normalization is chosen", normPower);
+
+ int partialVectorIndex = 0;
+ List<Path> partialVectorPaths = Lists.newArrayList();
+ List<Path> dictionaryChunks = datasetFeatures.getSecond();
+ for (Path dictionaryChunk : dictionaryChunks) {
+ Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
+ partialVectorPaths.add(partialVectorOutputPath);
+ makePartialVectors(input,
+ baseConf,
+ datasetFeatures.getFirst()[0],
+ datasetFeatures.getFirst()[1],
+ minDf,
+ maxDF,
+ dictionaryChunk,
+ partialVectorOutputPath,
+ sequentialAccessOutput,
+ namedVector);
+ }
+
+ Configuration conf = new Configuration(baseConf);
+
+ Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);
+
+ PartialVectorMerger.mergePartialVectors(partialVectorPaths,
+ outputDir,
+ baseConf,
+ normPower,
+ logNormalize,
+ datasetFeatures.getFirst()[0].intValue(),
+ sequentialAccessOutput,
+ namedVector,
+ numReducers);
+ HadoopUtil.delete(conf, partialVectorPaths);
+
+ }
+
+ /**
+ * Calculates the document frequencies of all terms from the input set of vectors in
+ * {@link SequenceFile} format. This job uses a fixed limit on the maximum memory used by the feature chunk
+ * per node thereby splitting the process across multiple map/reduces.
+ *
+ * @param input
+ * input directory of the vectors in {@link SequenceFile} format
+ * @param output
+ * output directory where document frequencies will be stored
+ * @param chunkSizeInMegabytes
+ * the size in MB of the feature => id chunk to be kept in memory at each node during Map/Reduce
+ * stage. Its recommended you calculated this based on the number of cores and the free memory
+ * available to you per node. Say, you have 2 cores and around 1GB extra memory to spare we
+ * recommend you use a split size of around 400-500MB so that two simultaneous reducers can create
+ * partial vectors without thrashing the system due to increased swapping
+ */
+ public static Pair<Long[],List<Path>> calculateDF(Path input,
+ Path output,
+ Configuration baseConf,
+ int chunkSizeInMegabytes)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
+ chunkSizeInMegabytes = MIN_CHUNKSIZE;
+ } else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
+ chunkSizeInMegabytes = MAX_CHUNKSIZE;
+ }
+
+ Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);
+
+ startDFCounting(input, wordCountPath, baseConf);
+
+ return createDictionaryChunks(wordCountPath, output, baseConf, chunkSizeInMegabytes);
+ }
+
+ /**
+ * Read the document frequency List which is built at the end of the DF Count Job. This will use constant
+ * memory and will run at the speed of your disk read
+ */
+ private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,
+ Path dictionaryPathBase,
+ Configuration baseConf,
+ int chunkSizeInMegabytes) throws IOException {
+ List<Path> chunkPaths = Lists.newArrayList();
+ Configuration conf = new Configuration(baseConf);
+
+ FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);
+
+ long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
+ int chunkIndex = 0;
+ Path chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
+ chunkPaths.add(chunkPath);
+ SequenceFile.Writer freqWriter =
+ new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
+
+ try {
+ long currentChunkSize = 0;
+ long featureCount = 0;
+ long vectorCount = Long.MAX_VALUE;
+ Path filesPattern = new Path(featureCountPath, OUTPUT_FILES_PATTERN);
+ for (Pair<IntWritable,LongWritable> record
+ : new SequenceFileDirIterable<IntWritable,LongWritable>(filesPattern,
+ PathType.GLOB,
+ null,
+ null,
+ true,
+ conf)) {
+
+ if (currentChunkSize > chunkSizeLimit) {
+ Closeables.close(freqWriter, false);
+ chunkIndex++;
+
+ chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
+ chunkPaths.add(chunkPath);
+
+ freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
+ currentChunkSize = 0;
+ }
+
+ int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + Integer.SIZE / 8 + Long.SIZE / 8;
+ currentChunkSize += fieldSize;
+ IntWritable key = record.getFirst();
+ LongWritable value = record.getSecond();
+ if (key.get() >= 0) {
+ freqWriter.append(key, value);
+ } else if (key.get() == -1) {
+ vectorCount = value.get();
+ }
+ featureCount = Math.max(key.get(), featureCount);
+
+ }
+ featureCount++;
+ Long[] counts = {featureCount, vectorCount};
+ return new Pair<>(counts, chunkPaths);
+ } finally {
+ Closeables.close(freqWriter, false);
+ }
+ }
+
+ /**
+ * Create a partial tfidf vector using a chunk of features from the input vectors. The input vectors has to
+ * be in the {@link SequenceFile} format
+ *
+ * @param input
+ * input directory of the vectors in {@link SequenceFile} format
+ * @param featureCount
+ * Number of unique features in the dataset
+ * @param vectorCount
+ * Number of vectors in the dataset
+ * @param minDf
+ * The minimum document frequency. Default 1
+ * @param maxDF
+ * The max percentage of vectors for the DF. Can be used to remove really high frequency features.
+ * Expressed as an integer between 0 and 100. Default 99
+ * @param dictionaryFilePath
+ * location of the chunk of features and the id's
+ * @param output
+ * output directory were the partial vectors have to be created
+ * @param sequentialAccess
+ * output vectors should be optimized for sequential access
+ * @param namedVector
+ * output vectors should be named, retaining key (doc id) as a label
+ */
+ private static void makePartialVectors(Path input,
+ Configuration baseConf,
+ Long featureCount,
+ Long vectorCount,
+ int minDf,
+ long maxDF,
+ Path dictionaryFilePath,
+ Path output,
+ boolean sequentialAccess,
+ boolean namedVector)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration(baseConf);
+ // this conf parameter needs to be set enable serialisation of conf values
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ conf.setLong(FEATURE_COUNT, featureCount);
+ conf.setLong(VECTOR_COUNT, vectorCount);
+ conf.setInt(MIN_DF, minDf);
+ conf.setLong(MAX_DF, maxDF);
+ conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
+ conf.setBoolean(PartialVectorMerger.NAMED_VECTOR, namedVector);
+ DistributedCache.addCacheFile(dictionaryFilePath.toUri(), conf);
+
+ Job job = new Job(conf);
+ job.setJobName(": MakePartialVectors: input-folder: " + input + ", dictionary-file: "
+ + dictionaryFilePath.toString());
+ job.setJarByClass(TFIDFConverter.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(VectorWritable.class);
+ FileInputFormat.setInputPaths(job, input);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setReducerClass(TFIDFPartialVectorReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ HadoopUtil.delete(conf, output);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+
+ /**
+ * Count the document frequencies of features in parallel using Map/Reduce. The input documents have to be
+ * in {@link SequenceFile} format
+ */
+ private static void startDFCounting(Path input, Path output, Configuration baseConf)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration(baseConf);
+ // this conf parameter needs to be set enable serialisation of conf values
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+ Job job = new Job(conf);
+ job.setJobName("VectorTfIdf Document Frequency Count running over input: " + input);
+ job.setJarByClass(TFIDFConverter.class);
+
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(TermDocumentCountMapper.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setCombinerClass(TermDocumentCountReducer.class);
+ job.setReducerClass(TermDocumentCountReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ HadoopUtil.delete(conf, output);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java b/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
new file mode 100644
index 0000000..1e71ed8
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.tfidf;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.vectorizer.TFIDF;
+import org.apache.mahout.vectorizer.common.PartialVectorMerger;
+
+/**
+ * Converts a document into a sparse vector
+ */
+public class TFIDFPartialVectorReducer extends
+ Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+ private final OpenIntLongHashMap dictionary = new OpenIntLongHashMap();
+ private final TFIDF tfidf = new TFIDF();
+
+ private int minDf = 1;
+ private long maxDf = -1;
+ private long vectorCount = 1;
+ private long featureCount;
+ private boolean sequentialAccess;
+ private boolean namedVector;
+
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context)
+ throws IOException, InterruptedException {
+ Iterator<VectorWritable> it = values.iterator();
+ if (!it.hasNext()) {
+ return;
+ }
+ Vector value = it.next().get();
+ Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
+ for (Vector.Element e : value.nonZeroes()) {
+ if (!dictionary.containsKey(e.index())) {
+ continue;
+ }
+ long df = dictionary.get(e.index());
+ if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) {
+ continue;
+ }
+ if (df < minDf) {
+ df = minDf;
+ }
+ vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
+ }
+ if (sequentialAccess) {
+ vector = new SequentialAccessSparseVector(vector);
+ }
+
+ if (namedVector) {
+ vector = new NamedVector(vector, key.toString());
+ }
+
+ VectorWritable vectorWritable = new VectorWritable(vector);
+ context.write(key, vectorWritable);
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+
+ vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
+ featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
+ minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);
+ maxDf = conf.getLong(TFIDFConverter.MAX_DF, -1);
+ sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+ namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
+
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ Path dictionaryFile = HadoopUtil.findInCacheByPartOfFilename(TFIDFConverter.FREQUENCY_FILE, localFiles);
+ // key is feature, value is the document frequency
+ for (Pair<IntWritable,LongWritable> record
+ : new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile, true, conf)) {
+ dictionary.put(record.getFirst().get(), record.getSecond().get());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/resources/version
----------------------------------------------------------------------
diff --git a/mr/src/main/resources/version b/mr/src/main/resources/version
new file mode 100644
index 0000000..f2ab45c
--- /dev/null
+++ b/mr/src/main/resources/version
@@ -0,0 +1 @@
+${project.version}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/cf/taste/common/CommonTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/cf/taste/common/CommonTest.java b/mr/src/test/java/org/apache/mahout/cf/taste/common/CommonTest.java
new file mode 100644
index 0000000..c37bcd3
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/cf/taste/common/CommonTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.common;
+
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+/** <p>Tests common classes.</p> */
+public final class CommonTest extends TasteTestCase {
+
+ @Test
+ public void testTasteException() {
+ // Just make sure this all doesn't, ah, throw an exception
+ TasteException te1 = new TasteException();
+ TasteException te2 = new TasteException(te1);
+ TasteException te3 = new TasteException(te2.toString(), te2);
+ TasteException te4 = new TasteException(te3.toString());
+ te4.printStackTrace(new PrintStream(new ByteArrayOutputStream()));
+ te4.printStackTrace(new PrintWriter(new OutputStreamWriter(new ByteArrayOutputStream())));
+ }
+
+ @Test
+ public void testNSUException() {
+ // Just make sure this all doesn't, ah, throw an exception
+ TasteException te1 = new NoSuchUserException();
+ TasteException te4 = new NoSuchUserException(te1.toString());
+ te4.printStackTrace(new PrintStream(new ByteArrayOutputStream()));
+ te4.printStackTrace(new PrintWriter(new OutputStreamWriter(new ByteArrayOutputStream())));
+ }
+
+ @Test
+ public void testNSIException() {
+ // Just make sure this all doesn't, ah, throw an exception
+ TasteException te1 = new NoSuchItemException();
+ TasteException te4 = new NoSuchItemException(te1.toString());
+ te4.printStackTrace(new PrintStream(new ByteArrayOutputStream()));
+ te4.printStackTrace(new PrintWriter(new OutputStreamWriter(new ByteArrayOutputStream())));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtilsTest.java b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtilsTest.java
new file mode 100644
index 0000000..b299b35
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtilsTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.junit.Test;
+
+/** <p>Tests {@link TasteHadoopUtils}.</p> */
+public class TasteHadoopUtilsTest extends TasteTestCase {
+
+ @Test
+ public void testWithinRange() {
+ assertTrue(TasteHadoopUtils.idToIndex(0) >= 0);
+ assertTrue(TasteHadoopUtils.idToIndex(0) < Integer.MAX_VALUE);
+
+ assertTrue(TasteHadoopUtils.idToIndex(1) >= 0);
+ assertTrue(TasteHadoopUtils.idToIndex(1) < Integer.MAX_VALUE);
+
+ assertTrue(TasteHadoopUtils.idToIndex(Long.MAX_VALUE) >= 0);
+ assertTrue(TasteHadoopUtils.idToIndex(Long.MAX_VALUE) < Integer.MAX_VALUE);
+
+ assertTrue(TasteHadoopUtils.idToIndex(Integer.MAX_VALUE) >= 0);
+ assertTrue(TasteHadoopUtils.idToIndex(Integer.MAX_VALUE) < Integer.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java
new file mode 100644
index 0000000..9465def
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.MahoutTestCase;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TopItemsQueueTest extends TasteTestCase {
+
+ @Test
+ public void topK() {
+
+ float[] ratings = {0.5f, 0.6f, 0.7f, 2.0f, 0.0f};
+
+ List<RecommendedItem> topItems = findTop(ratings, 2);
+
+ assertEquals(2, topItems.size());
+ assertEquals(3L, topItems.get(0).getItemID());
+ assertEquals(2.0f, topItems.get(0).getValue(), MahoutTestCase.EPSILON);
+ assertEquals(2L, topItems.get(1).getItemID());
+ assertEquals(0.7f, topItems.get(1).getValue(), MahoutTestCase.EPSILON);
+ }
+
+ @Test
+ public void topKInputSmallerThanK() {
+
+ float[] ratings = {0.7f, 2.0f};
+
+ List<RecommendedItem> topItems = findTop(ratings, 3);
+
+ assertEquals(2, topItems.size());
+ assertEquals(1L, topItems.get(0).getItemID());
+ assertEquals(2.0f, topItems.get(0).getValue(), MahoutTestCase.EPSILON);
+ assertEquals(0L, topItems.get(1).getItemID());
+ assertEquals(0.7f, topItems.get(1).getValue(), MahoutTestCase.EPSILON);
+ }
+
+
+ private static List<RecommendedItem> findTop(float[] ratings, int k) {
+ TopItemsQueue queue = new TopItemsQueue(k);
+
+ for (int item = 0; item < ratings.length; item++) {
+ MutableRecommendedItem top = queue.top();
+ if (ratings[item] > top.getValue()) {
+ top.set(item, ratings[item]);
+ queue.updateTop();
+ }
+ }
+
+ return queue.getTopItems();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
new file mode 100644
index 0000000..9d37da2
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.als;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
+import org.apache.mahout.cf.taste.impl.common.RunningAverage;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.SparseRowMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class ParallelALSFactorizationJobTest extends TasteTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(ParallelALSFactorizationJobTest.class);
+
+ private File inputFile;
+ private File intermediateDir;
+ private File outputDir;
+ private File tmpDir;
+ private Configuration conf;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ inputFile = getTestTempFile("prefs.txt");
+ intermediateDir = getTestTempDir("intermediate");
+ intermediateDir.delete();
+ outputDir = getTestTempDir("output");
+ outputDir.delete();
+ tmpDir = getTestTempDir("tmp");
+
+ conf = getConfiguration();
+ // reset as we run all tests in the same JVM
+ SharingMapper.reset();
+ }
+
+ @Test
+ public void completeJobToyExample() throws Exception {
+ explicitExample(1);
+ }
+
+ @Test
+ public void completeJobToyExampleMultithreaded() throws Exception {
+ explicitExample(2);
+ }
+
+ /**
+ * small integration test that runs the full job
+ *
+ * <pre>
+ *
+ * user-item-matrix
+ *
+ * burger hotdog berries icecream
+ * dog 5 5 2 -
+ * rabbit 2 - 3 5
+ * cow - 5 - 3
+ * donkey 3 - - 5
+ *
+ * </pre>
+ */
+ private void explicitExample(int numThreads) throws Exception {
+
+ Double na = Double.NaN;
+ Matrix preferences = new SparseRowMatrix(4, 4, new Vector[] {
+ new DenseVector(new double[] { 5.0, 5.0, 2.0, na }),
+ new DenseVector(new double[] { 2.0, na, 3.0, 5.0 }),
+ new DenseVector(new double[] { na, 5.0, na, 3.0 }),
+ new DenseVector(new double[] { 3.0, na, na, 5.0 }) });
+
+ writeLines(inputFile, preferencesAsText(preferences));
+
+ ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+ alsFactorization.setConf(conf);
+
+ int numFeatures = 3;
+ int numIterations = 5;
+ double lambda = 0.065;
+
+ alsFactorization.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+ "--tempDir", tmpDir.getAbsolutePath(), "--lambda", String.valueOf(lambda),
+ "--numFeatures", String.valueOf(numFeatures), "--numIterations", String.valueOf(numIterations),
+ "--numThreadsPerSolver", String.valueOf(numThreads) });
+
+ Matrix u = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "U/part-m-00000"),
+ preferences.numRows(), numFeatures);
+ Matrix m = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "M/part-m-00000"),
+ preferences.numCols(), numFeatures);
+
+ StringBuilder info = new StringBuilder();
+ info.append("\nA - users x items\n\n");
+ info.append(MathHelper.nice(preferences));
+ info.append("\nU - users x features\n\n");
+ info.append(MathHelper.nice(u));
+ info.append("\nM - items x features\n\n");
+ info.append(MathHelper.nice(m));
+ Matrix Ak = u.times(m.transpose());
+ info.append("\nAk - users x items\n\n");
+ info.append(MathHelper.nice(Ak));
+ info.append('\n');
+
+ log.info(info.toString());
+
+ RunningAverage avg = new FullRunningAverage();
+ for (MatrixSlice slice : preferences) {
+ for (Element e : slice.nonZeroes()) {
+ if (!Double.isNaN(e.get())) {
+ double pref = e.get();
+ double estimate = u.viewRow(slice.index()).dot(m.viewRow(e.index()));
+ double err = pref - estimate;
+ avg.addDatum(err * err);
+ log.info("Comparing preference of user [{}] towards item [{}], was [{}] estimate is [{}]",
+ slice.index(), e.index(), pref, estimate);
+ }
+ }
+ }
+ double rmse = Math.sqrt(avg.getAverage());
+ log.info("RMSE: {}", rmse);
+
+ assertTrue(rmse < 0.2);
+ }
+
+ @Test
+ public void completeJobImplicitToyExample() throws Exception {
+ implicitExample(1);
+ }
+
+ @Test
+ public void completeJobImplicitToyExampleMultithreaded() throws Exception {
+ implicitExample(2);
+ }
+
+ public void implicitExample(int numThreads) throws Exception {
+ Matrix observations = new SparseRowMatrix(4, 4, new Vector[] {
+ new DenseVector(new double[] { 5.0, 5.0, 2.0, 0 }),
+ new DenseVector(new double[] { 2.0, 0, 3.0, 5.0 }),
+ new DenseVector(new double[] { 0, 5.0, 0, 3.0 }),
+ new DenseVector(new double[] { 3.0, 0, 0, 5.0 }) });
+
+ Matrix preferences = new SparseRowMatrix(4, 4, new Vector[] {
+ new DenseVector(new double[] { 1.0, 1.0, 1.0, 0 }),
+ new DenseVector(new double[] { 1.0, 0, 1.0, 1.0 }),
+ new DenseVector(new double[] { 0, 1.0, 0, 1.0 }),
+ new DenseVector(new double[] { 1.0, 0, 0, 1.0 }) });
+
+ writeLines(inputFile, preferencesAsText(observations));
+
+ ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+ alsFactorization.setConf(conf);
+
+ int numFeatures = 3;
+ int numIterations = 5;
+ double lambda = 0.065;
+ double alpha = 20;
+
+ alsFactorization.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+ "--tempDir", tmpDir.getAbsolutePath(), "--lambda", String.valueOf(lambda),
+ "--implicitFeedback", String.valueOf(true), "--alpha", String.valueOf(alpha),
+ "--numFeatures", String.valueOf(numFeatures), "--numIterations", String.valueOf(numIterations),
+ "--numThreadsPerSolver", String.valueOf(numThreads) });
+
+ Matrix u = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "U/part-m-00000"),
+ observations.numRows(), numFeatures);
+ Matrix m = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "M/part-m-00000"),
+ observations.numCols(), numFeatures);
+
+ StringBuilder info = new StringBuilder();
+ info.append("\nObservations - users x items\n");
+ info.append(MathHelper.nice(observations));
+ info.append("\nA - users x items\n\n");
+ info.append(MathHelper.nice(preferences));
+ info.append("\nU - users x features\n\n");
+ info.append(MathHelper.nice(u));
+ info.append("\nM - items x features\n\n");
+ info.append(MathHelper.nice(m));
+ Matrix Ak = u.times(m.transpose());
+ info.append("\nAk - users x items\n\n");
+ info.append(MathHelper.nice(Ak));
+ info.append('\n');
+
+ log.info(info.toString());
+
+ RunningAverage avg = new FullRunningAverage();
+ for (MatrixSlice slice : preferences) {
+ for (Element e : slice.nonZeroes()) {
+ if (!Double.isNaN(e.get())) {
+ double pref = e.get();
+ double estimate = u.viewRow(slice.index()).dot(m.viewRow(e.index()));
+ double confidence = 1 + alpha * observations.getQuick(slice.index(), e.index());
+ double err = confidence * (pref - estimate) * (pref - estimate);
+ avg.addDatum(err);
+ log.info("Comparing preference of user [{}] towards item [{}], was [{}] with confidence [{}] "
+ + "estimate is [{}]", slice.index(), e.index(), pref, confidence, estimate);
+ }
+ }
+ }
+ double rmse = Math.sqrt(avg.getAverage());
+ log.info("RMSE: {}", rmse);
+
+ assertTrue(rmse < 0.4);
+ }
+
+ @Test
+ public void exampleWithIDMapping() throws Exception {
+
+ String[] preferencesWithLongIDs = {
+ "5568227754922264005,-4758971626494767444,5.0",
+ "5568227754922264005,3688396615879561990,5.0",
+ "5568227754922264005,4594226737871995304,2.0",
+ "550945997885173934,-4758971626494767444,2.0",
+ "550945997885173934,4594226737871995304,3.0",
+ "550945997885173934,706816485922781596,5.0",
+ "2448095297482319463,3688396615879561990,5.0",
+ "2448095297482319463,706816485922781596,3.0",
+ "6839920411763636962,-4758971626494767444,3.0",
+ "6839920411763636962,706816485922781596,5.0" };
+
+ writeLines(inputFile, preferencesWithLongIDs);
+
+ ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+ alsFactorization.setConf(conf);
+
+ int numFeatures = 3;
+ int numIterations = 5;
+ double lambda = 0.065;
+
+ alsFactorization.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+ "--tempDir", tmpDir.getAbsolutePath(), "--lambda", String.valueOf(lambda),
+ "--numFeatures", String.valueOf(numFeatures), "--numIterations", String.valueOf(numIterations),
+ "--numThreadsPerSolver", String.valueOf(1), "--usesLongIDs", String.valueOf(true) });
+
+
+ OpenIntLongHashMap userIDIndex =
+ TasteHadoopUtils.readIDIndexMap(outputDir.getAbsolutePath() + "/userIDIndex/part-r-00000", conf);
+ assertEquals(4, userIDIndex.size());
+
+ OpenIntLongHashMap itemIDIndex =
+ TasteHadoopUtils.readIDIndexMap(outputDir.getAbsolutePath() + "/itemIDIndex/part-r-00000", conf);
+ assertEquals(4, itemIDIndex.size());
+
+ OpenIntObjectHashMap<Vector> u =
+ MathHelper.readMatrixRows(conf, new Path(outputDir.getAbsolutePath(), "U/part-m-00000"));
+ OpenIntObjectHashMap<Vector> m =
+ MathHelper.readMatrixRows(conf, new Path(outputDir.getAbsolutePath(), "M/part-m-00000"));
+
+ assertEquals(4, u.size());
+ assertEquals(4, m.size());
+
+ RunningAverage avg = new FullRunningAverage();
+ for (String line : preferencesWithLongIDs) {
+ String[] tokens = TasteHadoopUtils.splitPrefTokens(line);
+ long userID = Long.parseLong(tokens[TasteHadoopUtils.USER_ID_POS]);
+ long itemID = Long.parseLong(tokens[TasteHadoopUtils.ITEM_ID_POS]);
+ double rating = Double.parseDouble(tokens[2]);
+
+ Vector userFeatures = u.get(TasteHadoopUtils.idToIndex(userID));
+ Vector itemFeatures = m.get(TasteHadoopUtils.idToIndex(itemID));
+
+ double estimate = userFeatures.dot(itemFeatures);
+
+ double err = rating - estimate;
+ avg.addDatum(err * err);
+ }
+
+ double rmse = Math.sqrt(avg.getAverage());
+ log.info("RMSE: {}", rmse);
+
+ assertTrue(rmse < 0.2);
+ }
+
+ protected static String preferencesAsText(Matrix preferences) {
+ StringBuilder prefsAsText = new StringBuilder();
+ String separator = "";
+ for (MatrixSlice slice : preferences) {
+ for (Element e : slice.nonZeroes()) {
+ if (!Double.isNaN(e.get())) {
+ prefsAsText.append(separator)
+ .append(slice.index()).append(',').append(e.index()).append(',').append(e.get());
+ separator = "\n";
+ }
+ }
+ }
+ System.out.println(prefsAsText.toString());
+ return prefsAsText.toString();
+ }
+
+ @Test
+ public void recommenderJobWithIDMapping() throws Exception {
+
+ String[] preferencesWithLongIDs = {
+ "5568227754922264005,-4758971626494767444,5.0",
+ "5568227754922264005,3688396615879561990,5.0",
+ "5568227754922264005,4594226737871995304,2.0",
+ "550945997885173934,-4758971626494767444,2.0",
+ "550945997885173934,4594226737871995304,3.0",
+ "550945997885173934,706816485922781596,5.0",
+ "2448095297482319463,3688396615879561990,5.0",
+ "2448095297482319463,706816485922781596,3.0",
+ "6839920411763636962,-4758971626494767444,3.0",
+ "6839920411763636962,706816485922781596,5.0" };
+
+ writeLines(inputFile, preferencesWithLongIDs);
+
+ ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+ alsFactorization.setConf(conf);
+
+ int numFeatures = 3;
+ int numIterations = 5;
+ double lambda = 0.065;
+
+ Configuration conf = getConfiguration();
+
+ int success = ToolRunner.run(alsFactorization, new String[] {
+ "-Dhadoop.tmp.dir=" + conf.get("hadoop.tmp.dir"),
+ "--input", inputFile.getAbsolutePath(),
+ "--output", intermediateDir.getAbsolutePath(),
+ "--tempDir", tmpDir.getAbsolutePath(),
+ "--lambda", String.valueOf(lambda),
+ "--numFeatures", String.valueOf(numFeatures),
+ "--numIterations", String.valueOf(numIterations),
+ "--numThreadsPerSolver", String.valueOf(1),
+ "--usesLongIDs", String.valueOf(true) });
+
+ assertEquals(0, success);
+
+ // reset as we run in the same JVM
+ SharingMapper.reset();
+
+ RecommenderJob recommender = new RecommenderJob();
+
+ success = ToolRunner.run(recommender, new String[] {
+ "-Dhadoop.tmp.dir=" + conf.get("hadoop.tmp.dir"),
+ "--input", intermediateDir.getAbsolutePath() + "/userRatings/",
+ "--userFeatures", intermediateDir.getAbsolutePath() + "/U/",
+ "--itemFeatures", intermediateDir.getAbsolutePath() + "/M/",
+ "--numRecommendations", String.valueOf(2),
+ "--maxRating", String.valueOf(5.0),
+ "--numThreads", String.valueOf(2),
+ "--usesLongIDs", String.valueOf(true),
+ "--userIDIndex", intermediateDir.getAbsolutePath() + "/userIDIndex/",
+ "--itemIDIndex", intermediateDir.getAbsolutePath() + "/itemIDIndex/",
+ "--output", outputDir.getAbsolutePath() });
+
+ assertEquals(0, success);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/item/IDReaderTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/item/IDReaderTest.java b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/item/IDReaderTest.java
new file mode 100644
index 0000000..650ca98
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/cf/taste/hadoop/item/IDReaderTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.junit.Test;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+
+public class IDReaderTest extends TasteTestCase {
+
+ static final String USER_ITEM_FILTER_FIELD = "userItemFilter";
+
+ @Test
+ public void testUserItemFilter() throws Exception {
+ Configuration conf = getConfiguration();
+ IDReader idReader = new IDReader(conf);
+ Map<Long, FastIDSet> userItemFilter = Maps.newHashMap();
+
+ long user1 = 1;
+ long user2 = 2;
+
+ idReader.addUserAndItemIdToUserItemFilter(userItemFilter, user1, 100L);
+ idReader.addUserAndItemIdToUserItemFilter(userItemFilter, user1, 200L);
+ idReader.addUserAndItemIdToUserItemFilter(userItemFilter, user2, 300L);
+
+ FastIDSet userIds = IDReader.extractAllUserIdsFromUserItemFilter(userItemFilter);
+
+ assertEquals(2, userIds.size());
+ assertTrue(userIds.contains(user1));
+ assertTrue(userIds.contains(user1));
+
+ setField(idReader, USER_ITEM_FILTER_FIELD, userItemFilter);
+
+ FastIDSet itemsForUser1 = idReader.getItemsToRecommendForUser(user1);
+ assertEquals(2, itemsForUser1.size());
+ assertTrue(itemsForUser1.contains(100L));
+ assertTrue(itemsForUser1.contains(200L));
+
+ FastIDSet itemsForUser2 = idReader.getItemsToRecommendForUser(user2);
+ assertEquals(1, itemsForUser2.size());
+ assertTrue(itemsForUser2.contains(300L));
+
+ FastIDSet itemsForNonExistingUser = idReader.getItemsToRecommendForUser(3L);
+ assertTrue(itemsForNonExistingUser.isEmpty());
+ }
+
+}