You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 13:14:46 UTC
[20/24] mahout git commit: MAHOUT-2034 Split MR and New Examples into
seperate modules
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
new file mode 100644
index 0000000..752bb48
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VarIntWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Convert the Mail archives (see {@link org.apache.mahout.text.SequenceFilesFromMailArchives}) to a preference
+ * file that can be consumed by the {@link org.apache.mahout.cf.taste.hadoop.item.RecommenderJob}.
+ * <p/>
+ * This assumes the input is a Sequence File, that the key is: filename/message id and the value is a list
+ * (separated by the user's choosing) containing the from email and any references
+ * <p/>
+ * The output is a matrix where either the from or to are the rows (represented as longs) and the columns are the
+ * message ids that the user has interacted with (as a VectorWritable). This class currently does not account for
+ * thread hijacking.
+ * <p/>
+ * It also outputs a side table mapping the row ids to their original and the message ids to the message thread id
+ */
+public final class MailToPrefsDriver extends AbstractJob {
+
+ private static final Logger log = LoggerFactory.getLogger(MailToPrefsDriver.class);
+
+ private static final String OUTPUT_FILES_PATTERN = "part-*";
+ private static final int DICTIONARY_BYTE_OVERHEAD = 4;
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new MailToPrefsDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption("chunkSize", "cs", "The size of chunks to write. Default is 100 mb", "100");
+ addOption("separator", "sep", "The separator used in the input file to separate to, from, subject. Default is \\n",
+ "\n");
+ addOption("from", "f", "The position in the input text (value) where the from email is located, starting from "
+ + "zero (0).", "0");
+ addOption("refs", "r", "The position in the input text (value) where the reference ids are located, "
+ + "starting from zero (0).", "1");
+ addOption(buildOption("useCounts", "u", "If set, then use the number of times the user has interacted with a "
+ + "thread as an indication of their preference. Otherwise, use boolean preferences.", false, false,
+ String.valueOf(true)));
+ Map<String, List<String>> parsedArgs = parseArguments(args);
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ int chunkSize = Integer.parseInt(getOption("chunkSize"));
+ String separator = getOption("separator");
+ Configuration conf = getConf();
+ boolean useCounts = hasOption("useCounts");
+ AtomicInteger currentPhase = new AtomicInteger();
+ int[] msgDim = new int[1];
+ //TODO: mod this to not do so many passes over the data. Dictionary creation could probably be a chain mapper
+ List<Path> msgIdChunks = null;
+ boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION);
+ // create the dictionary between message ids and longs
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ //TODO: there seems to be a pattern emerging for dictionary creation
+ // -- sparse vectors from seq files also has this.
+ Path msgIdsPath = new Path(output, "msgIds");
+ if (overwrite) {
+ HadoopUtil.delete(conf, msgIdsPath);
+ }
+ log.info("Creating Msg Id Dictionary");
+ Job createMsgIdDictionary = prepareJob(input,
+ msgIdsPath,
+ SequenceFileInputFormat.class,
+ MsgIdToDictionaryMapper.class,
+ Text.class,
+ VarIntWritable.class,
+ MailToDictionaryReducer.class,
+ Text.class,
+ VarIntWritable.class,
+ SequenceFileOutputFormat.class);
+
+ boolean succeeded = createMsgIdDictionary.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ //write out the dictionary at the top level
+ msgIdChunks = createDictionaryChunks(msgIdsPath, output, "msgIds-dictionary-",
+ createMsgIdDictionary.getConfiguration(), chunkSize, msgDim);
+ }
+ //create the dictionary between from email addresses and longs
+ List<Path> fromChunks = null;
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Path fromIdsPath = new Path(output, "fromIds");
+ if (overwrite) {
+ HadoopUtil.delete(conf, fromIdsPath);
+ }
+ log.info("Creating From Id Dictionary");
+ Job createFromIdDictionary = prepareJob(input,
+ fromIdsPath,
+ SequenceFileInputFormat.class,
+ FromEmailToDictionaryMapper.class,
+ Text.class,
+ VarIntWritable.class,
+ MailToDictionaryReducer.class,
+ Text.class,
+ VarIntWritable.class,
+ SequenceFileOutputFormat.class);
+ createFromIdDictionary.getConfiguration().set(EmailUtility.SEPARATOR, separator);
+ boolean succeeded = createFromIdDictionary.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ //write out the dictionary at the top level
+ int[] fromDim = new int[1];
+ fromChunks = createDictionaryChunks(fromIdsPath, output, "fromIds-dictionary-",
+ createFromIdDictionary.getConfiguration(), chunkSize, fromDim);
+ }
+ //OK, we have our dictionaries, let's output the real thing we need: <from_id -> <msgId, msgId, msgId, ...>>
+ if (shouldRunNextPhase(parsedArgs, currentPhase) && fromChunks != null && msgIdChunks != null) {
+ //Job map
+ //may be a way to do this so that we can load the from ids in memory, if they are small enough so that
+ // we don't need the double loop
+ log.info("Creating recommendation matrix");
+ Path vecPath = new Path(output, "recInput");
+ if (overwrite) {
+ HadoopUtil.delete(conf, vecPath);
+ }
+ //conf.set(EmailUtility.FROM_DIMENSION, String.valueOf(fromDim[0]));
+ conf.set(EmailUtility.MSG_ID_DIMENSION, String.valueOf(msgDim[0]));
+ conf.set(EmailUtility.FROM_PREFIX, "fromIds-dictionary-");
+ conf.set(EmailUtility.MSG_IDS_PREFIX, "msgIds-dictionary-");
+ conf.set(EmailUtility.FROM_INDEX, getOption("from"));
+ conf.set(EmailUtility.REFS_INDEX, getOption("refs"));
+ conf.set(EmailUtility.SEPARATOR, separator);
+ conf.set(MailToRecReducer.USE_COUNTS_PREFERENCE, String.valueOf(useCounts));
+ int j = 0;
+ int i = 0;
+ for (Path fromChunk : fromChunks) {
+ for (Path idChunk : msgIdChunks) {
+ Path out = new Path(vecPath, "tmp-" + i + '-' + j);
+ DistributedCache.setCacheFiles(new URI[]{fromChunk.toUri(), idChunk.toUri()}, conf);
+ Job createRecMatrix = prepareJob(input, out, SequenceFileInputFormat.class,
+ MailToRecMapper.class, Text.class, LongWritable.class, MailToRecReducer.class, Text.class,
+ NullWritable.class, TextOutputFormat.class);
+ createRecMatrix.getConfiguration().set("mapred.output.compress", "false");
+ boolean succeeded = createRecMatrix.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ //copy the results up a level
+ //HadoopUtil.copyMergeSeqFiles(out.getFileSystem(conf), out, vecPath.getFileSystem(conf), outPath, true,
+ // conf, "");
+ FileStatus[] fs = HadoopUtil.getFileStatus(new Path(out, "*"), PathType.GLOB, PathFilters.partFilter(), null,
+ conf);
+ for (int k = 0; k < fs.length; k++) {
+ FileStatus f = fs[k];
+ Path outPath = new Path(vecPath, "chunk-" + i + '-' + j + '-' + k);
+ FileUtil.copy(f.getPath().getFileSystem(conf), f.getPath(), outPath.getFileSystem(conf), outPath, true,
+ overwrite, conf);
+ }
+ HadoopUtil.delete(conf, out);
+ j++;
+ }
+ i++;
+ }
+ //concat the files together
+ /*Path mergePath = new Path(output, "vectors.dat");
+ if (overwrite) {
+ HadoopUtil.delete(conf, mergePath);
+ }
+ log.info("Merging together output vectors to vectors.dat in {}", output);*/
+ //HadoopUtil.copyMergeSeqFiles(vecPath.getFileSystem(conf), vecPath, mergePath.getFileSystem(conf), mergePath,
+ // false, conf, "\n");
+ }
+
+ return 0;
+ }
+
+ private static List<Path> createDictionaryChunks(Path inputPath,
+ Path dictionaryPathBase,
+ String name,
+ Configuration baseConf,
+ int chunkSizeInMegabytes, int[] maxTermDimension)
+ throws IOException {
+ List<Path> chunkPaths = new ArrayList<>();
+
+ Configuration conf = new Configuration(baseConf);
+
+ FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+ long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
+ int chunkIndex = 0;
+ Path chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+ chunkPaths.add(chunkPath);
+
+ SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+
+ try {
+ long currentChunkSize = 0;
+ Path filesPattern = new Path(inputPath, OUTPUT_FILES_PATTERN);
+ int i = 1; //start at 1, since a miss in the OpenObjectIntHashMap returns a 0
+ for (Pair<Writable, Writable> record
+ : new SequenceFileDirIterable<>(filesPattern, PathType.GLOB, null, null, true, conf)) {
+ if (currentChunkSize > chunkSizeLimit) {
+ Closeables.close(dictWriter, false);
+ chunkIndex++;
+
+ chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+ chunkPaths.add(chunkPath);
+
+ dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+ currentChunkSize = 0;
+ }
+
+ Writable key = record.getFirst();
+ int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
+ currentChunkSize += fieldSize;
+ dictWriter.append(key, new IntWritable(i++));
+ }
+ maxTermDimension[0] = i;
+ } finally {
+ Closeables.close(dictWriter, false);
+ }
+
+ return chunkPaths;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
new file mode 100644
index 0000000..91bbd17
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public final class MailToRecMapper extends Mapper<Text, Text, Text, LongWritable> {
+
+ private static final Logger log = LoggerFactory.getLogger(MailToRecMapper.class);
+
+ private final OpenObjectIntHashMap<String> fromDictionary = new OpenObjectIntHashMap<>();
+ private final OpenObjectIntHashMap<String> msgIdDictionary = new OpenObjectIntHashMap<>();
+ private String separator = "\n";
+ private int fromIdx;
+ private int refsIdx;
+
+ public enum Counters {
+ REFERENCE, ORIGINAL
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ String fromPrefix = conf.get(EmailUtility.FROM_PREFIX);
+ String msgPrefix = conf.get(EmailUtility.MSG_IDS_PREFIX);
+ fromIdx = conf.getInt(EmailUtility.FROM_INDEX, 0);
+ refsIdx = conf.getInt(EmailUtility.REFS_INDEX, 1);
+ EmailUtility.loadDictionaries(conf, fromPrefix, fromDictionary, msgPrefix, msgIdDictionary);
+ log.info("From Dictionary size: {} Msg Id Dictionary size: {}", fromDictionary.size(), msgIdDictionary.size());
+ separator = context.getConfiguration().get(EmailUtility.SEPARATOR);
+ }
+
+ @Override
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+
+ int msgIdKey = Integer.MIN_VALUE;
+
+
+ int fromKey = Integer.MIN_VALUE;
+ String valStr = value.toString();
+ String[] splits = StringUtils.splitByWholeSeparatorPreserveAllTokens(valStr, separator);
+
+ if (splits != null && splits.length > 0) {
+ if (splits.length > refsIdx) {
+ String from = EmailUtility.cleanUpEmailAddress(splits[fromIdx]);
+ fromKey = fromDictionary.get(from);
+ }
+ //get the references
+ if (splits.length > refsIdx) {
+ String[] theRefs = EmailUtility.parseReferences(splits[refsIdx]);
+ if (theRefs != null && theRefs.length > 0) {
+ //we have a reference, the first one is the original message id, so map to that one if it exists
+ msgIdKey = msgIdDictionary.get(theRefs[0]);
+ context.getCounter(Counters.REFERENCE).increment(1);
+ }
+ }
+ }
+ //we don't have any references, so use the msg id
+ if (msgIdKey == Integer.MIN_VALUE) {
+ //get the msg id and the from and output the associated ids
+ String keyStr = key.toString();
+ int idx = keyStr.lastIndexOf('/');
+ if (idx != -1) {
+ String msgId = keyStr.substring(idx + 1);
+ msgIdKey = msgIdDictionary.get(msgId);
+ context.getCounter(Counters.ORIGINAL).increment(1);
+ }
+ }
+
+ if (msgIdKey != Integer.MIN_VALUE && fromKey != Integer.MIN_VALUE) {
+ context.write(new Text(fromKey + "," + msgIdKey), new LongWritable(1));
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java
new file mode 100644
index 0000000..ee36a41
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class MailToRecReducer extends Reducer<Text, LongWritable, Text, NullWritable> {
+ //if true, then output weight
+ private boolean useCounts = true;
+ /**
+ * We can either ignore how many times the user interacted (boolean) or output the number of times they interacted.
+ */
+ public static final String USE_COUNTS_PREFERENCE = "useBooleanPreferences";
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ useCounts = context.getConfiguration().getBoolean(USE_COUNTS_PREFERENCE, true);
+ }
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ if (useCounts) {
+ long sum = 0;
+ for (LongWritable value : values) {
+ sum++;
+ }
+ context.write(new Text(key.toString() + ',' + sum), null);
+ } else {
+ context.write(new Text(key.toString()), null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
new file mode 100644
index 0000000..f3de847
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+
+import java.io.IOException;
+
+/**
+ * Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives}
+ */
+public final class MsgIdToDictionaryMapper extends Mapper<Text, Text, Text, VarIntWritable> {
+
+ @Override
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ //message id is in the key: /201008/AANLkTikvVnhNH+Y5AGEwqd2=u0CFv2mCm0ce6E6oBnj1@mail.gmail.com
+ String keyStr = key.toString();
+ int idx = keyStr.lastIndexOf('@'); //find the last @
+ if (idx == -1) {
+ context.getCounter(EmailUtility.Counters.NO_MESSAGE_ID).increment(1);
+ } else {
+ //found the @, now find the last slash before the @ and grab everything after that
+ idx = keyStr.lastIndexOf('/', idx);
+ String msgId = keyStr.substring(idx + 1);
+ if (EmailUtility.WHITESPACE.matcher(msgId).matches()) {
+ context.getCounter(EmailUtility.Counters.NO_MESSAGE_ID).increment(1);
+ } else {
+ context.write(new Text(msgId), new VarIntWritable(1));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java
new file mode 100644
index 0000000..c358021
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+
+public final class DataFileIterable implements Iterable<Pair<PreferenceArray,long[]>> {
+
+ private final File dataFile;
+
+ public DataFileIterable(File dataFile) {
+ this.dataFile = dataFile;
+ }
+
+ @Override
+ public Iterator<Pair<PreferenceArray, long[]>> iterator() {
+ try {
+ return new DataFileIterator(dataFile);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
new file mode 100644
index 0000000..786e080
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>An {@link java.util.Iterator} which iterates over any of the KDD Cup's rating files. These include the files
+ * {train,test,validation}Idx{1,2}}.txt. See http://kddcup.yahoo.com/. Each element in the iteration corresponds
+ * to one user's ratings as a {@link PreferenceArray} and corresponding timestamps as a parallel {@code long}
+ * array.</p>
+ *
+ * <p>Timestamps in the data set are relative to some unknown point in time, for anonymity. They are assumed
+ * to be relative to the epoch, time 0, or January 1 1970, for purposes here.</p>
+ */
+public final class DataFileIterator
+ extends AbstractIterator<Pair<PreferenceArray,long[]>>
+ implements SkippingIterator<Pair<PreferenceArray,long[]>>, Closeable {
+
+ private static final Pattern COLON_PATTERN = Pattern.compile(":");
+ private static final Pattern PIPE_PATTERN = Pattern.compile("\\|");
+ private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+ private final FileLineIterator lineIterator;
+
+ private static final Logger log = LoggerFactory.getLogger(DataFileIterator.class);
+
+ public DataFileIterator(File dataFile) throws IOException {
+ if (dataFile == null || dataFile.isDirectory() || !dataFile.exists()) {
+ throw new IllegalArgumentException("Bad data file: " + dataFile);
+ }
+ lineIterator = new FileLineIterator(dataFile);
+ }
+
+ @Override
+ protected Pair<PreferenceArray, long[]> computeNext() {
+
+ if (!lineIterator.hasNext()) {
+ return endOfData();
+ }
+
+ String line = lineIterator.next();
+ // First a userID|ratingsCount line
+ String[] tokens = PIPE_PATTERN.split(line);
+
+ long userID = Long.parseLong(tokens[0]);
+ int ratingsLeftToRead = Integer.parseInt(tokens[1]);
+ int ratingsRead = 0;
+
+ PreferenceArray currentUserPrefs = new GenericUserPreferenceArray(ratingsLeftToRead);
+ long[] timestamps = new long[ratingsLeftToRead];
+
+ while (ratingsLeftToRead > 0) {
+
+ line = lineIterator.next();
+
+ // Then a data line. May be 1-4 tokens depending on whether preference info is included (it's not in test data)
+ // or whether date info is included (not inluded in track 2). Item ID is always first, and date is the last
+ // two fields if it exists.
+ tokens = TAB_PATTERN.split(line);
+ boolean hasPref = tokens.length == 2 || tokens.length == 4;
+ boolean hasDate = tokens.length > 2;
+
+ long itemID = Long.parseLong(tokens[0]);
+
+ currentUserPrefs.setUserID(0, userID);
+ currentUserPrefs.setItemID(ratingsRead, itemID);
+ if (hasPref) {
+ float preference = Float.parseFloat(tokens[1]);
+ currentUserPrefs.setValue(ratingsRead, preference);
+ }
+
+ if (hasDate) {
+ long timestamp;
+ if (hasPref) {
+ timestamp = parseFakeTimestamp(tokens[2], tokens[3]);
+ } else {
+ timestamp = parseFakeTimestamp(tokens[1], tokens[2]);
+ }
+ timestamps[ratingsRead] = timestamp;
+ }
+
+ ratingsRead++;
+ ratingsLeftToRead--;
+ }
+
+ return new Pair<>(currentUserPrefs, timestamps);
+ }
+
+ @Override
+ public void skip(int n) {
+ for (int i = 0; i < n; i++) {
+ if (lineIterator.hasNext()) {
+ String line = lineIterator.next();
+ // First a userID|ratingsCount line
+ String[] tokens = PIPE_PATTERN.split(line);
+ int linesToSKip = Integer.parseInt(tokens[1]);
+ lineIterator.skip(linesToSKip);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ endOfData();
+ try {
+ Closeables.close(lineIterator, true);
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param dateString "date" in days since some undisclosed date, which we will arbitrarily assume to be the
+ * epoch, January 1 1970.
+ * @param timeString time of day in HH:mm:ss format
+ * @return the UNIX timestamp for this moment in time
+ */
+ private static long parseFakeTimestamp(String dateString, CharSequence timeString) {
+ int days = Integer.parseInt(dateString);
+ String[] timeTokens = COLON_PATTERN.split(timeString);
+ int hours = Integer.parseInt(timeTokens[0]);
+ int minutes = Integer.parseInt(timeTokens[1]);
+ int seconds = Integer.parseInt(timeTokens[2]);
+ return 86400L * days + 3600L + hours + 60L * minutes + seconds;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java
new file mode 100644
index 0000000..4b62050
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import com.google.common.base.Preconditions;
+import org.apache.mahout.cf.taste.common.Refreshable;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericDataModel;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.SamplingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>An {@link DataModel} which reads into memory any of the KDD Cup's rating files; it is really
+ * meant for use with training data in the files trainIdx{1,2}}.txt.
+ * See http://kddcup.yahoo.com/.</p>
+ *
+ * <p>Timestamps in the data set are relative to some unknown point in time, for anonymity. They are assumed
+ * to be relative to the epoch, time 0, or January 1 1970, for purposes here.</p>
+ */
+public final class KDDCupDataModel implements DataModel {
+
+ private static final Logger log = LoggerFactory.getLogger(KDDCupDataModel.class);
+
+ private final File dataFileDirectory;
+ private final DataModel delegate;
+
+ /**
+ * @param dataFile training rating file
+ */
+ public KDDCupDataModel(File dataFile) throws IOException {
+ this(dataFile, false, 1.0);
+ }
+
+ /**
+ * @param dataFile training rating file
+ * @param storeDates if true, dates are parsed and stored, otherwise not
+ * @param samplingRate percentage of users to keep; can be used to reduce memory requirements
+ */
+ public KDDCupDataModel(File dataFile, boolean storeDates, double samplingRate) throws IOException {
+
+ Preconditions.checkArgument(!Double.isNaN(samplingRate) && samplingRate > 0.0 && samplingRate <= 1.0,
+ "Must be: 0.0 < samplingRate <= 1.0");
+
+ dataFileDirectory = dataFile.getParentFile();
+
+ Iterator<Pair<PreferenceArray,long[]>> dataIterator = new DataFileIterator(dataFile);
+ if (samplingRate < 1.0) {
+ dataIterator = new SamplingIterator<>(dataIterator, samplingRate);
+ }
+
+ FastByIDMap<PreferenceArray> userData = new FastByIDMap<>();
+ FastByIDMap<FastByIDMap<Long>> timestamps = new FastByIDMap<>();
+
+ while (dataIterator.hasNext()) {
+
+ Pair<PreferenceArray,long[]> pair = dataIterator.next();
+ PreferenceArray userPrefs = pair.getFirst();
+ long[] timestampsForPrefs = pair.getSecond();
+
+ userData.put(userPrefs.getUserID(0), userPrefs);
+ if (storeDates) {
+ FastByIDMap<Long> itemTimestamps = new FastByIDMap<>();
+ for (int i = 0; i < timestampsForPrefs.length; i++) {
+ long timestamp = timestampsForPrefs[i];
+ if (timestamp > 0L) {
+ itemTimestamps.put(userPrefs.getItemID(i), timestamp);
+ }
+ }
+ }
+
+ }
+
+ if (storeDates) {
+ delegate = new GenericDataModel(userData, timestamps);
+ } else {
+ delegate = new GenericDataModel(userData);
+ }
+
+ Runtime runtime = Runtime.getRuntime();
+ log.info("Loaded data model in about {}MB heap", (runtime.totalMemory() - runtime.freeMemory()) / 1000000);
+ }
+
+ public File getDataFileDirectory() {
+ return dataFileDirectory;
+ }
+
+ public static File getTrainingFile(File dataFileDirectory) {
+ return getFile(dataFileDirectory, "trainIdx");
+ }
+
+ public static File getValidationFile(File dataFileDirectory) {
+ return getFile(dataFileDirectory, "validationIdx");
+ }
+
+ public static File getTestFile(File dataFileDirectory) {
+ return getFile(dataFileDirectory, "testIdx");
+ }
+
+ public static File getTrackFile(File dataFileDirectory) {
+ return getFile(dataFileDirectory, "trackData");
+ }
+
+ private static File getFile(File dataFileDirectory, String prefix) {
+ // Works on set 1 or 2
+ for (int set : new int[] {1,2}) {
+ // Works on sample data from before contest or real data
+ for (String firstLinesOrNot : new String[] {"", ".firstLines"}) {
+ for (String gzippedOrNot : new String[] {".gz", ""}) {
+ File dataFile = new File(dataFileDirectory, prefix + set + firstLinesOrNot + ".txt" + gzippedOrNot);
+ if (dataFile.exists()) {
+ return dataFile;
+ }
+ }
+ }
+ }
+ throw new IllegalArgumentException("Can't find " + prefix + " file in " + dataFileDirectory);
+ }
+
+ @Override
+ public LongPrimitiveIterator getUserIDs() throws TasteException {
+ return delegate.getUserIDs();
+ }
+
+ @Override
+ public PreferenceArray getPreferencesFromUser(long userID) throws TasteException {
+ return delegate.getPreferencesFromUser(userID);
+ }
+
+ @Override
+ public FastIDSet getItemIDsFromUser(long userID) throws TasteException {
+ return delegate.getItemIDsFromUser(userID);
+ }
+
+ @Override
+ public LongPrimitiveIterator getItemIDs() throws TasteException {
+ return delegate.getItemIDs();
+ }
+
+ @Override
+ public PreferenceArray getPreferencesForItem(long itemID) throws TasteException {
+ return delegate.getPreferencesForItem(itemID);
+ }
+
+ @Override
+ public Float getPreferenceValue(long userID, long itemID) throws TasteException {
+ return delegate.getPreferenceValue(userID, itemID);
+ }
+
+ @Override
+ public Long getPreferenceTime(long userID, long itemID) throws TasteException {
+ return delegate.getPreferenceTime(userID, itemID);
+ }
+
+ @Override
+ public int getNumItems() throws TasteException {
+ return delegate.getNumItems();
+ }
+
+ @Override
+ public int getNumUsers() throws TasteException {
+ return delegate.getNumUsers();
+ }
+
+ @Override
+ public int getNumUsersWithPreferenceFor(long itemID) throws TasteException {
+ return delegate.getNumUsersWithPreferenceFor(itemID);
+ }
+
+ @Override
+ public int getNumUsersWithPreferenceFor(long itemID1, long itemID2) throws TasteException {
+ return delegate.getNumUsersWithPreferenceFor(itemID1, itemID2);
+ }
+
+ @Override
+ public void setPreference(long userID, long itemID, float value) throws TasteException {
+ delegate.setPreference(userID, itemID, value);
+ }
+
+ @Override
+ public void removePreference(long userID, long itemID) throws TasteException {
+ delegate.removePreference(userID, itemID);
+ }
+
+ @Override
+ public boolean hasPreferenceValues() {
+ return delegate.hasPreferenceValues();
+ }
+
+ @Override
+ public float getMaxPreference() {
+ return 100.0f;
+ }
+
+ @Override
+ public float getMinPreference() {
+ return 0.0f;
+ }
+
+ @Override
+ public void refresh(Collection<Refreshable> alreadyRefreshed) {
+ // do nothing
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java
new file mode 100644
index 0000000..3f4a732
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import org.apache.commons.io.Charsets;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * <p>This class converts a KDD Cup input file into a compressed CSV format. The output format is
+ * {@code userID,itemID,score,timestamp}. It can optionally restrict its output to exclude
+ * score and/or timestamp.</p>
+ *
+ * <p>Run as: {@code ToCSV (input file) (output file) [num columns to output]}</p>
+ */
+public final class ToCSV {
+
+ private ToCSV() {
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ File inputFile = new File(args[0]);
+ File outputFile = new File(args[1]);
+ int columnsToOutput = 4;
+ if (args.length >= 3) {
+ columnsToOutput = Integer.parseInt(args[2]);
+ }
+
+ OutputStream outStream = new GZIPOutputStream(new FileOutputStream(outputFile));
+
+ try (Writer outWriter = new BufferedWriter(new OutputStreamWriter(outStream, Charsets.UTF_8))){
+ for (Pair<PreferenceArray,long[]> user : new DataFileIterable(inputFile)) {
+ PreferenceArray prefs = user.getFirst();
+ long[] timestamps = user.getSecond();
+ for (int i = 0; i < prefs.length(); i++) {
+ outWriter.write(String.valueOf(prefs.getUserID(i)));
+ outWriter.write(',');
+ outWriter.write(String.valueOf(prefs.getItemID(i)));
+ if (columnsToOutput > 2) {
+ outWriter.write(',');
+ outWriter.write(String.valueOf(prefs.getValue(i)));
+ }
+ if (columnsToOutput > 3) {
+ outWriter.write(',');
+ outWriter.write(String.valueOf(timestamps[i]));
+ }
+ outWriter.write('\n');
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java
new file mode 100644
index 0000000..0112ab9
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class EstimateConverter {
+
+ private static final Logger log = LoggerFactory.getLogger(EstimateConverter.class);
+
+ private EstimateConverter() {}
+
+ public static byte convert(double estimate, long userID, long itemID) {
+ if (Double.isNaN(estimate)) {
+ log.warn("Unable to compute estimate for user {}, item {}", userID, itemID);
+ return 0x7F;
+ } else {
+ int scaledEstimate = (int) (estimate * 2.55);
+ if (scaledEstimate > 255) {
+ scaledEstimate = 255;
+ } else if (scaledEstimate < 0) {
+ scaledEstimate = 0;
+ }
+ return (byte) scaledEstimate;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java
new file mode 100644
index 0000000..72056da
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mahout.cf.taste.common.NoSuchItemException;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class Track1Callable implements Callable<byte[]> {
+
+ private static final Logger log = LoggerFactory.getLogger(Track1Callable.class);
+ private static final AtomicInteger COUNT = new AtomicInteger();
+
+ private final Recommender recommender;
+ private final PreferenceArray userTest;
+
+ Track1Callable(Recommender recommender, PreferenceArray userTest) {
+ this.recommender = recommender;
+ this.userTest = userTest;
+ }
+
+ @Override
+ public byte[] call() throws TasteException {
+ long userID = userTest.get(0).getUserID();
+ byte[] result = new byte[userTest.length()];
+ for (int i = 0; i < userTest.length(); i++) {
+ long itemID = userTest.getItemID(i);
+ double estimate;
+ try {
+ estimate = recommender.estimatePreference(userID, itemID);
+ } catch (NoSuchItemException nsie) {
+ // OK in the sample data provided before the contest, should never happen otherwise
+ log.warn("Unknown item {}; OK unless this is the real contest data", itemID);
+ continue;
+ }
+ result[i] = EstimateConverter.convert(estimate, userID, itemID);
+ }
+
+ if (COUNT.incrementAndGet() % 10000 == 0) {
+ log.info("Completed {} users", COUNT.get());
+ }
+
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java
new file mode 100644
index 0000000..067daf5
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.mahout.cf.taste.common.Refreshable;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender;
+import org.apache.mahout.cf.taste.impl.similarity.UncenteredCosineSimilarity;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.recommender.IDRescorer;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.cf.taste.similarity.ItemSimilarity;
+
+public final class Track1Recommender implements Recommender {
+
+ private final Recommender recommender;
+
+ public Track1Recommender(DataModel dataModel) throws TasteException {
+ // Change this to whatever you like!
+ ItemSimilarity similarity = new UncenteredCosineSimilarity(dataModel);
+ recommender = new GenericItemBasedRecommender(dataModel, similarity);
+ }
+
+ @Override
+ public List<RecommendedItem> recommend(long userID, int howMany) throws TasteException {
+ return recommender.recommend(userID, howMany);
+ }
+
+ @Override
+ public List<RecommendedItem> recommend(long userID, int howMany, boolean includeKnownItems) throws TasteException {
+ return recommend(userID, howMany, null, includeKnownItems);
+ }
+
+ @Override
+ public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer) throws TasteException {
+ return recommender.recommend(userID, howMany, rescorer, false);
+ }
+
+ @Override
+ public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer, boolean includeKnownItems)
+ throws TasteException {
+ return recommender.recommend(userID, howMany, rescorer, includeKnownItems);
+ }
+
+ @Override
+ public float estimatePreference(long userID, long itemID) throws TasteException {
+ return recommender.estimatePreference(userID, itemID);
+ }
+
+ @Override
+ public void setPreference(long userID, long itemID, float value) throws TasteException {
+ recommender.setPreference(userID, itemID, value);
+ }
+
+ @Override
+ public void removePreference(long userID, long itemID) throws TasteException {
+ recommender.removePreference(userID, itemID);
+ }
+
+ @Override
+ public DataModel getDataModel() {
+ return recommender.getDataModel();
+ }
+
+ @Override
+ public void refresh(Collection<Refreshable> alreadyRefreshed) {
+ recommender.refresh(alreadyRefreshed);
+ }
+
+ @Override
+ public String toString() {
+ return "Track1Recommender[recommender:" + recommender + ']';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java
new file mode 100644
index 0000000..6b9fe1b
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+
+final class Track1RecommenderBuilder implements RecommenderBuilder {
+
+ @Override
+ public Recommender buildRecommender(DataModel dataModel) throws TasteException {
+ return new Track1Recommender(dataModel);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java
new file mode 100644
index 0000000..bcd0a3d
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.eval.DataModelBuilder;
+import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
+import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable;
+import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverageAndStdDev;
+import org.apache.mahout.cf.taste.impl.common.RunningAverage;
+import org.apache.mahout.cf.taste.impl.common.RunningAverageAndStdDev;
+import org.apache.mahout.cf.taste.impl.eval.AbstractDifferenceRecommenderEvaluator;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Attempts to run an evaluation just like that dictated for Yahoo's KDD Cup, Track 1.
+ * It will compute the RMSE of a validation data set against the predicted ratings from
+ * the training data set.
+ */
+public final class Track1RecommenderEvaluator extends AbstractDifferenceRecommenderEvaluator {
+
+ private static final Logger log = LoggerFactory.getLogger(Track1RecommenderEvaluator.class);
+
+ private RunningAverage average;
+ private final File dataFileDirectory;
+
+ public Track1RecommenderEvaluator(File dataFileDirectory) {
+ setMaxPreference(100.0f);
+ setMinPreference(0.0f);
+ average = new FullRunningAverage();
+ this.dataFileDirectory = dataFileDirectory;
+ }
+
+ @Override
+ public double evaluate(RecommenderBuilder recommenderBuilder,
+ DataModelBuilder dataModelBuilder,
+ DataModel dataModel,
+ double trainingPercentage,
+ double evaluationPercentage) throws TasteException {
+
+ Recommender recommender = recommenderBuilder.buildRecommender(dataModel);
+
+ Collection<Callable<Void>> estimateCallables = Lists.newArrayList();
+ AtomicInteger noEstimateCounter = new AtomicInteger();
+ for (Pair<PreferenceArray,long[]> userData
+ : new DataFileIterable(KDDCupDataModel.getValidationFile(dataFileDirectory))) {
+ PreferenceArray validationPrefs = userData.getFirst();
+ long userID = validationPrefs.get(0).getUserID();
+ estimateCallables.add(
+ new PreferenceEstimateCallable(recommender, userID, validationPrefs, noEstimateCounter));
+ }
+
+ RunningAverageAndStdDev timing = new FullRunningAverageAndStdDev();
+ execute(estimateCallables, noEstimateCounter, timing);
+
+ double result = computeFinalEvaluation();
+ log.info("Evaluation result: {}", result);
+ return result;
+ }
+
+ // Use RMSE scoring:
+
+ @Override
+ protected void reset() {
+ average = new FullRunningAverage();
+ }
+
+ @Override
+ protected void processOneEstimate(float estimatedPreference, Preference realPref) {
+ double diff = realPref.getValue() - estimatedPreference;
+ average.addDatum(diff * diff);
+ }
+
+ @Override
+ protected double computeFinalEvaluation() {
+ return Math.sqrt(average.getAverage());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java
new file mode 100644
index 0000000..deadc00
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.cli2.OptionException;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.example.TasteOptionParser;
+import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Track1RecommenderEvaluatorRunner {
+
+ private static final Logger log = LoggerFactory.getLogger(Track1RecommenderEvaluatorRunner.class);
+
+ private Track1RecommenderEvaluatorRunner() {
+ }
+
+ public static void main(String... args) throws IOException, TasteException, OptionException {
+ File dataFileDirectory = TasteOptionParser.getRatings(args);
+ if (dataFileDirectory == null) {
+ throw new IllegalArgumentException("No data directory");
+ }
+ if (!dataFileDirectory.exists() || !dataFileDirectory.isDirectory()) {
+ throw new IllegalArgumentException("Bad data file directory: " + dataFileDirectory);
+ }
+ Track1RecommenderEvaluator evaluator = new Track1RecommenderEvaluator(dataFileDirectory);
+ DataModel model = new KDDCupDataModel(KDDCupDataModel.getTrainingFile(dataFileDirectory));
+ double evaluation = evaluator.evaluate(new Track1RecommenderBuilder(),
+ null,
+ model,
+ Float.NaN,
+ Float.NaN);
+ log.info(String.valueOf(evaluation));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java
new file mode 100644
index 0000000..a0ff126
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable;
+import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * <p>Runs "track 1" of the KDD Cup competition using whatever recommender is inside {@link Track1Recommender}
+ * and attempts to output the result in the correct contest format.</p>
+ *
+ * <p>Run as: {@code Track1Runner [track 1 data file directory] [output file]}</p>
+ */
+public final class Track1Runner {
+
+ private static final Logger log = LoggerFactory.getLogger(Track1Runner.class);
+
+ private Track1Runner() {
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ File dataFileDirectory = new File(args[0]);
+ if (!dataFileDirectory.exists() || !dataFileDirectory.isDirectory()) {
+ throw new IllegalArgumentException("Bad data file directory: " + dataFileDirectory);
+ }
+
+ long start = System.currentTimeMillis();
+
+ KDDCupDataModel model = new KDDCupDataModel(KDDCupDataModel.getTrainingFile(dataFileDirectory));
+ Track1Recommender recommender = new Track1Recommender(model);
+
+ long end = System.currentTimeMillis();
+ log.info("Loaded model in {}s", (end - start) / 1000);
+ start = end;
+
+ Collection<Track1Callable> callables = new ArrayList<>();
+ for (Pair<PreferenceArray,long[]> tests : new DataFileIterable(KDDCupDataModel.getTestFile(dataFileDirectory))) {
+ PreferenceArray userTest = tests.getFirst();
+ callables.add(new Track1Callable(recommender, userTest));
+ }
+
+ int cores = Runtime.getRuntime().availableProcessors();
+ log.info("Running on {} cores", cores);
+ ExecutorService executor = Executors.newFixedThreadPool(cores);
+ List<Future<byte[]>> results = executor.invokeAll(callables);
+ executor.shutdown();
+
+ end = System.currentTimeMillis();
+ log.info("Ran recommendations in {}s", (end - start) / 1000);
+ start = end;
+
+ try (OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(args[1])))){
+ for (Future<byte[]> result : results) {
+ for (byte estimate : result.get()) {
+ out.write(estimate);
+ }
+ }
+ }
+
+ end = System.currentTimeMillis();
+ log.info("Wrote output in {}s", (end - start) / 1000);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java
new file mode 100644
index 0000000..022d78c
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1.svd;
+
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericPreference;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.Preference;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * can be used to drop {@link DataModel}s into {@link ParallelArraysSGDFactorizer}
+ */
+public class DataModelFactorizablePreferences implements FactorizablePreferences {
+
+ private final FastIDSet userIDs;
+ private final FastIDSet itemIDs;
+
+ private final List<Preference> preferences;
+
+ private final float minPreference;
+ private final float maxPreference;
+
+ public DataModelFactorizablePreferences(DataModel dataModel) {
+
+ minPreference = dataModel.getMinPreference();
+ maxPreference = dataModel.getMaxPreference();
+
+ try {
+ userIDs = new FastIDSet(dataModel.getNumUsers());
+ itemIDs = new FastIDSet(dataModel.getNumItems());
+ preferences = new ArrayList<>();
+
+ LongPrimitiveIterator userIDsIterator = dataModel.getUserIDs();
+ while (userIDsIterator.hasNext()) {
+ long userID = userIDsIterator.nextLong();
+ userIDs.add(userID);
+ for (Preference preference : dataModel.getPreferencesFromUser(userID)) {
+ itemIDs.add(preference.getItemID());
+ preferences.add(new GenericPreference(userID, preference.getItemID(), preference.getValue()));
+ }
+ }
+ } catch (TasteException te) {
+ throw new IllegalStateException("Unable to create factorizable preferences!", te);
+ }
+ }
+
+ @Override
+ public LongPrimitiveIterator getUserIDs() {
+ return userIDs.iterator();
+ }
+
+ @Override
+ public LongPrimitiveIterator getItemIDs() {
+ return itemIDs.iterator();
+ }
+
+ @Override
+ public Iterable<Preference> getPreferences() {
+ return preferences;
+ }
+
+ @Override
+ public float getMinPreference() {
+ return minPreference;
+ }
+
+ @Override
+ public float getMaxPreference() {
+ return maxPreference;
+ }
+
+ @Override
+ public int numUsers() {
+ return userIDs.size();
+ }
+
+ @Override
+ public int numItems() {
+ return itemIDs.size();
+ }
+
+ @Override
+ public int numPreferences() {
+ return preferences.size();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java
new file mode 100644
index 0000000..a126dec
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1.svd;
+
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.model.Preference;
+
+/**
+ * models the necessary input for {@link ParallelArraysSGDFactorizer}
+ */
+public interface FactorizablePreferences {
+
+ LongPrimitiveIterator getUserIDs();
+
+ LongPrimitiveIterator getItemIDs();
+
+ Iterable<Preference> getPreferences();
+
+ float getMinPreference();
+
+ float getMaxPreference();
+
+ int numUsers();
+
+ int numItems();
+
+ int numPreferences();
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java
new file mode 100644
index 0000000..6dcef6b
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1.svd;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable;
+import org.apache.mahout.cf.taste.impl.common.AbstractLongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+
+import java.io.File;
+
+public class KDDCupFactorizablePreferences implements FactorizablePreferences {
+
+ private final File dataFile;
+
+ public KDDCupFactorizablePreferences(File dataFile) {
+ this.dataFile = dataFile;
+ }
+
+ @Override
+ public LongPrimitiveIterator getUserIDs() {
+ return new FixedSizeLongIterator(numUsers());
+ }
+
+ @Override
+ public LongPrimitiveIterator getItemIDs() {
+ return new FixedSizeLongIterator(numItems());
+ }
+
+ @Override
+ public Iterable<Preference> getPreferences() {
+ Iterable<Iterable<Preference>> prefIterators =
+ Iterables.transform(new DataFileIterable(dataFile),
+ new Function<Pair<PreferenceArray,long[]>,Iterable<Preference>>() {
+ @Override
+ public Iterable<Preference> apply(Pair<PreferenceArray,long[]> from) {
+ return from.getFirst();
+ }
+ });
+ return Iterables.concat(prefIterators);
+ }
+
+ @Override
+ public float getMinPreference() {
+ return 0;
+ }
+
+ @Override
+ public float getMaxPreference() {
+ return 100;
+ }
+
+ @Override
+ public int numUsers() {
+ return 1000990;
+ }
+
+ @Override
+ public int numItems() {
+ return 624961;
+ }
+
+ @Override
+ public int numPreferences() {
+ return 252800275;
+ }
+
+ static class FixedSizeLongIterator extends AbstractLongPrimitiveIterator {
+
+ private long currentValue;
+ private final long maximum;
+
+ FixedSizeLongIterator(long maximum) {
+ this.maximum = maximum;
+ currentValue = 0;
+ }
+
+ @Override
+ public long nextLong() {
+ return currentValue++;
+ }
+
+ @Override
+ public long peek() {
+ return currentValue;
+ }
+
+ @Override
+ public void skip(int n) {
+ currentValue += n;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentValue < maximum;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}