You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 13:14:46 UTC

[20/24] mahout git commit: MAHOUT-2034 Split MR and New Examples into seperate modules

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
new file mode 100644
index 0000000..752bb48
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VarIntWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Convert the Mail archives (see {@link org.apache.mahout.text.SequenceFilesFromMailArchives}) to a preference
+ * file that can be consumed by the {@link org.apache.mahout.cf.taste.hadoop.item.RecommenderJob}.
+ * <p/>
+ * This assumes the input is a Sequence File, that the key is: filename/message id and the value is a list
+ * (separated by the user's choosing) containing the from email and any references
+ * <p/>
+ * The output is a matrix where either the from or to are the rows (represented as longs) and the columns are the
+ * message ids that the user has interacted with (as a VectorWritable).  This class currently does not account for
+ * thread hijacking.
+ * <p/>
+ * It also outputs a side table mapping the row ids to their original and the message ids to the message thread id
+ */
+public final class MailToPrefsDriver extends AbstractJob {
+
+  private static final Logger log = LoggerFactory.getLogger(MailToPrefsDriver.class);
+
+  private static final String OUTPUT_FILES_PATTERN = "part-*";
+  private static final int DICTIONARY_BYTE_OVERHEAD = 4;
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new MailToPrefsDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption("chunkSize", "cs", "The size of chunks to write.  Default is 100 mb", "100");
+    addOption("separator", "sep", "The separator used in the input file to separate to, from, subject.  Default is \\n",
+        "\n");
+    addOption("from", "f", "The position in the input text (value) where the from email is located, starting from "
+        + "zero (0).", "0");
+    addOption("refs", "r", "The position in the input text (value) where the reference ids are located, "
+        + "starting from zero (0).", "1");
+    addOption(buildOption("useCounts", "u", "If set, then use the number of times the user has interacted with a "
+        + "thread as an indication of their preference.  Otherwise, use boolean preferences.", false, false,
+        String.valueOf(true)));
+    Map<String, List<String>> parsedArgs = parseArguments(args);
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    int chunkSize = Integer.parseInt(getOption("chunkSize"));
+    String separator = getOption("separator");
+    Configuration conf = getConf();
+    boolean useCounts = hasOption("useCounts");
+    AtomicInteger currentPhase = new AtomicInteger();
+    int[] msgDim = new int[1];
+    //TODO: mod this to not do so many passes over the data.  Dictionary creation could probably be a chain mapper
+    List<Path> msgIdChunks = null;
+    boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION);
+    // create the dictionary between message ids and longs
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      //TODO: there seems to be a pattern emerging for dictionary creation
+      // -- sparse vectors from seq files also has this.
+      Path msgIdsPath = new Path(output, "msgIds");
+      if (overwrite) {
+        HadoopUtil.delete(conf, msgIdsPath);
+      }
+      log.info("Creating Msg Id Dictionary");
+      Job createMsgIdDictionary = prepareJob(input,
+              msgIdsPath,
+              SequenceFileInputFormat.class,
+              MsgIdToDictionaryMapper.class,
+              Text.class,
+              VarIntWritable.class,
+              MailToDictionaryReducer.class,
+              Text.class,
+              VarIntWritable.class,
+              SequenceFileOutputFormat.class);
+
+      boolean succeeded = createMsgIdDictionary.waitForCompletion(true);
+      if (!succeeded) {
+        return -1;
+      }
+      //write out the dictionary at the top level
+      msgIdChunks = createDictionaryChunks(msgIdsPath, output, "msgIds-dictionary-",
+          createMsgIdDictionary.getConfiguration(), chunkSize, msgDim);
+    }
+    //create the dictionary between from email addresses and longs
+    List<Path> fromChunks = null;
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Path fromIdsPath = new Path(output, "fromIds");
+      if (overwrite) {
+        HadoopUtil.delete(conf, fromIdsPath);
+      }
+      log.info("Creating From Id Dictionary");
+      Job createFromIdDictionary = prepareJob(input,
+              fromIdsPath,
+              SequenceFileInputFormat.class,
+              FromEmailToDictionaryMapper.class,
+              Text.class,
+              VarIntWritable.class,
+              MailToDictionaryReducer.class,
+              Text.class,
+              VarIntWritable.class,
+              SequenceFileOutputFormat.class);
+      createFromIdDictionary.getConfiguration().set(EmailUtility.SEPARATOR, separator);
+      boolean succeeded = createFromIdDictionary.waitForCompletion(true);
+      if (!succeeded) {
+        return -1;
+      }
+      //write out the dictionary at the top level
+      int[] fromDim = new int[1];
+      fromChunks = createDictionaryChunks(fromIdsPath, output, "fromIds-dictionary-",
+          createFromIdDictionary.getConfiguration(), chunkSize, fromDim);
+    }
+    //OK, we have our dictionaries, let's output the real thing we need: <from_id -> <msgId, msgId, msgId, ...>>
+    if (shouldRunNextPhase(parsedArgs, currentPhase) && fromChunks != null && msgIdChunks != null) {
+      //Job map
+      //may be a way to do this so that we can load the from ids in memory, if they are small enough so that
+      // we don't need the double loop
+      log.info("Creating recommendation matrix");
+      Path vecPath = new Path(output, "recInput");
+      if (overwrite) {
+        HadoopUtil.delete(conf, vecPath);
+      }
+      //conf.set(EmailUtility.FROM_DIMENSION, String.valueOf(fromDim[0]));
+      conf.set(EmailUtility.MSG_ID_DIMENSION, String.valueOf(msgDim[0]));
+      conf.set(EmailUtility.FROM_PREFIX, "fromIds-dictionary-");
+      conf.set(EmailUtility.MSG_IDS_PREFIX, "msgIds-dictionary-");
+      conf.set(EmailUtility.FROM_INDEX, getOption("from"));
+      conf.set(EmailUtility.REFS_INDEX, getOption("refs"));
+      conf.set(EmailUtility.SEPARATOR, separator);
+      conf.set(MailToRecReducer.USE_COUNTS_PREFERENCE, String.valueOf(useCounts));
+      int j = 0;
+      int i = 0;
+      for (Path fromChunk : fromChunks) {
+        for (Path idChunk : msgIdChunks) {
+          Path out = new Path(vecPath, "tmp-" + i + '-' + j);
+          DistributedCache.setCacheFiles(new URI[]{fromChunk.toUri(), idChunk.toUri()}, conf);
+          Job createRecMatrix = prepareJob(input, out, SequenceFileInputFormat.class,
+                  MailToRecMapper.class, Text.class, LongWritable.class, MailToRecReducer.class, Text.class,
+                  NullWritable.class, TextOutputFormat.class);
+          createRecMatrix.getConfiguration().set("mapred.output.compress", "false");
+          boolean succeeded = createRecMatrix.waitForCompletion(true);
+          if (!succeeded) {
+            return -1;
+          }
+          //copy the results up a level
+          //HadoopUtil.copyMergeSeqFiles(out.getFileSystem(conf), out, vecPath.getFileSystem(conf), outPath, true,
+          // conf, "");
+          FileStatus[] fs = HadoopUtil.getFileStatus(new Path(out, "*"), PathType.GLOB, PathFilters.partFilter(), null,
+              conf);
+          for (int k = 0; k < fs.length; k++) {
+            FileStatus f = fs[k];
+            Path outPath = new Path(vecPath, "chunk-" + i + '-' + j + '-' + k);
+            FileUtil.copy(f.getPath().getFileSystem(conf), f.getPath(), outPath.getFileSystem(conf), outPath, true,
+                overwrite, conf);
+          }
+          HadoopUtil.delete(conf, out);
+          j++;
+        }
+        i++;
+      }
+      //concat the files together
+      /*Path mergePath = new Path(output, "vectors.dat");
+      if (overwrite) {
+        HadoopUtil.delete(conf, mergePath);
+      }
+      log.info("Merging together output vectors to vectors.dat in {}", output);*/
+      //HadoopUtil.copyMergeSeqFiles(vecPath.getFileSystem(conf), vecPath, mergePath.getFileSystem(conf), mergePath,
+      // false, conf, "\n");
+    }
+
+    return 0;
+  }
+
+  private static List<Path> createDictionaryChunks(Path inputPath,
+                                                   Path dictionaryPathBase,
+                                                   String name,
+                                                   Configuration baseConf,
+                                                   int chunkSizeInMegabytes, int[] maxTermDimension)
+    throws IOException {
+    List<Path> chunkPaths = new ArrayList<>();
+
+    Configuration conf = new Configuration(baseConf);
+
+    FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+    long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
+    int chunkIndex = 0;
+    Path chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+    chunkPaths.add(chunkPath);
+
+    SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+
+    try {
+      long currentChunkSize = 0;
+      Path filesPattern = new Path(inputPath, OUTPUT_FILES_PATTERN);
+      int i = 1; //start at 1, since a miss in the OpenObjectIntHashMap returns a 0
+      for (Pair<Writable, Writable> record
+              : new SequenceFileDirIterable<>(filesPattern, PathType.GLOB, null, null, true, conf)) {
+        if (currentChunkSize > chunkSizeLimit) {
+          Closeables.close(dictWriter, false);
+          chunkIndex++;
+
+          chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+          chunkPaths.add(chunkPath);
+
+          dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+          currentChunkSize = 0;
+        }
+
+        Writable key = record.getFirst();
+        int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
+        currentChunkSize += fieldSize;
+        dictWriter.append(key, new IntWritable(i++));
+      }
+      maxTermDimension[0] = i;
+    } finally {
+      Closeables.close(dictWriter, false);
+    }
+
+    return chunkPaths;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
new file mode 100644
index 0000000..91bbd17
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public final class MailToRecMapper extends Mapper<Text, Text, Text, LongWritable> {
+
+  private static final Logger log = LoggerFactory.getLogger(MailToRecMapper.class);
+
+  private final OpenObjectIntHashMap<String> fromDictionary = new OpenObjectIntHashMap<>();
+  private final OpenObjectIntHashMap<String> msgIdDictionary = new OpenObjectIntHashMap<>();
+  private String separator = "\n";
+  private int fromIdx;
+  private int refsIdx;
+
+  public enum Counters {
+    REFERENCE, ORIGINAL
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    String fromPrefix = conf.get(EmailUtility.FROM_PREFIX);
+    String msgPrefix = conf.get(EmailUtility.MSG_IDS_PREFIX);
+    fromIdx = conf.getInt(EmailUtility.FROM_INDEX, 0);
+    refsIdx = conf.getInt(EmailUtility.REFS_INDEX, 1);
+    EmailUtility.loadDictionaries(conf, fromPrefix, fromDictionary, msgPrefix, msgIdDictionary);
+    log.info("From Dictionary size: {} Msg Id Dictionary size: {}", fromDictionary.size(), msgIdDictionary.size());
+    separator = context.getConfiguration().get(EmailUtility.SEPARATOR);
+  }
+
+  @Override
+  protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+
+    int msgIdKey = Integer.MIN_VALUE;
+
+
+    int fromKey = Integer.MIN_VALUE;
+    String valStr = value.toString();
+    String[] splits = StringUtils.splitByWholeSeparatorPreserveAllTokens(valStr, separator);
+
+    if (splits != null && splits.length > 0) {
+      if (splits.length > refsIdx) {
+        String from = EmailUtility.cleanUpEmailAddress(splits[fromIdx]);
+        fromKey = fromDictionary.get(from);
+      }
+      //get the references
+      if (splits.length > refsIdx) {
+        String[] theRefs = EmailUtility.parseReferences(splits[refsIdx]);
+        if (theRefs != null && theRefs.length > 0) {
+          //we have a reference, the first one is the original message id, so map to that one if it exists
+          msgIdKey = msgIdDictionary.get(theRefs[0]);
+          context.getCounter(Counters.REFERENCE).increment(1);
+        }
+      }
+    }
+    //we don't have any references, so use the msg id
+    if (msgIdKey == Integer.MIN_VALUE) {
+      //get the msg id and the from and output the associated ids
+      String keyStr = key.toString();
+      int idx = keyStr.lastIndexOf('/');
+      if (idx != -1) {
+        String msgId = keyStr.substring(idx + 1);
+        msgIdKey = msgIdDictionary.get(msgId);
+        context.getCounter(Counters.ORIGINAL).increment(1);
+      }
+    }
+
+    if (msgIdKey != Integer.MIN_VALUE && fromKey != Integer.MIN_VALUE) {
+      context.write(new Text(fromKey + "," + msgIdKey), new LongWritable(1));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java
new file mode 100644
index 0000000..ee36a41
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class MailToRecReducer extends Reducer<Text, LongWritable, Text, NullWritable> {
+  //if true, then output weight
+  private boolean useCounts = true;
+  /**
+   * We can either ignore how many times the user interacted (boolean) or output the number of times they interacted.
+   */
+  public static final String USE_COUNTS_PREFERENCE = "useBooleanPreferences";
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    useCounts = context.getConfiguration().getBoolean(USE_COUNTS_PREFERENCE, true);
+  }
+
+  @Override
+  protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+    throws IOException, InterruptedException {
+    if (useCounts) {
+      long sum = 0;
+      for (LongWritable value : values) {
+        sum++;
+      }
+      context.write(new Text(key.toString() + ',' + sum), null);
+    } else {
+      context.write(new Text(key.toString()), null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
new file mode 100644
index 0000000..f3de847
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.email;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+
+import java.io.IOException;
+
+/**
+ * Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives}
+ */
+public final class MsgIdToDictionaryMapper extends Mapper<Text, Text, Text, VarIntWritable> {
+
+  @Override
+  protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    //message id is in the key: /201008/AANLkTikvVnhNH+Y5AGEwqd2=u0CFv2mCm0ce6E6oBnj1@mail.gmail.com
+    String keyStr = key.toString();
+    int idx = keyStr.lastIndexOf('@'); //find the last @
+    if (idx == -1) {
+      context.getCounter(EmailUtility.Counters.NO_MESSAGE_ID).increment(1);
+    } else {
+      //found the @, now find the last slash before the @ and grab everything after that
+      idx = keyStr.lastIndexOf('/', idx);
+      String msgId = keyStr.substring(idx + 1);
+      if (EmailUtility.WHITESPACE.matcher(msgId).matches()) {
+        context.getCounter(EmailUtility.Counters.NO_MESSAGE_ID).increment(1);
+      } else {
+        context.write(new Text(msgId), new VarIntWritable(1));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java
new file mode 100644
index 0000000..c358021
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+
+public final class DataFileIterable implements Iterable<Pair<PreferenceArray,long[]>> {
+
+  private final File dataFile;
+
+  public DataFileIterable(File dataFile) {
+    this.dataFile = dataFile;
+  }
+
+  @Override
+  public Iterator<Pair<PreferenceArray, long[]>> iterator() {
+    try {
+      return new DataFileIterator(dataFile);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+ 
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
new file mode 100644
index 0000000..786e080
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>An {@link java.util.Iterator} which iterates over any of the KDD Cup's rating files. These include the files
+ * {train,test,validation}Idx{1,2}}.txt. See http://kddcup.yahoo.com/. Each element in the iteration corresponds
+ * to one user's ratings as a {@link PreferenceArray} and corresponding timestamps as a parallel {@code long}
+ * array.</p>
+ *
+ * <p>Timestamps in the data set are relative to some unknown point in time, for anonymity. They are assumed
+ * to be relative to the epoch, time 0, or January 1 1970, for purposes here.</p>
+ */
+public final class DataFileIterator
+    extends AbstractIterator<Pair<PreferenceArray,long[]>>
+    implements SkippingIterator<Pair<PreferenceArray,long[]>>, Closeable {
+
+  private static final Pattern COLON_PATTERN = Pattern.compile(":");
+  private static final Pattern PIPE_PATTERN = Pattern.compile("\\|");
+  private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+  private final FileLineIterator lineIterator;
+
+  private static final Logger log = LoggerFactory.getLogger(DataFileIterator.class);
+
+  public DataFileIterator(File dataFile) throws IOException {
+    if (dataFile == null || dataFile.isDirectory() || !dataFile.exists()) {
+      throw new IllegalArgumentException("Bad data file: " + dataFile);
+    }
+    lineIterator = new FileLineIterator(dataFile);
+  }
+
+  @Override
+  protected Pair<PreferenceArray, long[]> computeNext() {
+
+    if (!lineIterator.hasNext()) {
+      return endOfData();
+    }
+
+    String line = lineIterator.next();
+    // First a userID|ratingsCount line
+    String[] tokens = PIPE_PATTERN.split(line);
+
+    long userID = Long.parseLong(tokens[0]);
+    int ratingsLeftToRead = Integer.parseInt(tokens[1]);
+    int ratingsRead = 0;
+
+    PreferenceArray currentUserPrefs = new GenericUserPreferenceArray(ratingsLeftToRead);
+    long[] timestamps = new long[ratingsLeftToRead];
+
+    while (ratingsLeftToRead > 0) {
+
+      line = lineIterator.next();
+
+      // Then a data line. May be 1-4 tokens depending on whether preference info is included (it's not in test data)
+      // or whether date info is included (not inluded in track 2). Item ID is always first, and date is the last
+      // two fields if it exists.
+      tokens = TAB_PATTERN.split(line);
+      boolean hasPref = tokens.length == 2 || tokens.length == 4;
+      boolean hasDate = tokens.length > 2;
+
+      long itemID = Long.parseLong(tokens[0]);
+
+      currentUserPrefs.setUserID(0, userID);
+      currentUserPrefs.setItemID(ratingsRead, itemID);
+      if (hasPref) {
+        float preference = Float.parseFloat(tokens[1]);
+        currentUserPrefs.setValue(ratingsRead, preference);
+      }
+
+      if (hasDate) {
+        long timestamp;
+        if (hasPref) {
+          timestamp = parseFakeTimestamp(tokens[2], tokens[3]);
+        } else {
+          timestamp = parseFakeTimestamp(tokens[1], tokens[2]);
+        }
+        timestamps[ratingsRead] = timestamp;
+      }
+
+      ratingsRead++;
+      ratingsLeftToRead--;
+    }
+
+    return new Pair<>(currentUserPrefs, timestamps);
+  }
+
+  @Override
+  public void skip(int n) {
+    for (int i = 0; i < n; i++) {
+      if (lineIterator.hasNext()) {
+        String line = lineIterator.next();
+        // First a userID|ratingsCount line
+        String[] tokens = PIPE_PATTERN.split(line);
+        int linesToSKip = Integer.parseInt(tokens[1]);
+        lineIterator.skip(linesToSKip);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    endOfData();
+    try {
+      Closeables.close(lineIterator, true);
+    } catch (IOException e) {
+      log.error(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * @param dateString "date" in days since some undisclosed date, which we will arbitrarily assume to be the
+   *  epoch, January 1 1970.
+   * @param timeString time of day in HH:mm:ss format
+   * @return the UNIX timestamp for this moment in time
+   */
+  private static long parseFakeTimestamp(String dateString, CharSequence timeString) {
+    int days = Integer.parseInt(dateString);
+    String[] timeTokens = COLON_PATTERN.split(timeString);
+    int hours = Integer.parseInt(timeTokens[0]);
+    int minutes = Integer.parseInt(timeTokens[1]);
+    int seconds = Integer.parseInt(timeTokens[2]);
+    return 86400L * days + 3600L + hours + 60L * minutes + seconds;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java
new file mode 100644
index 0000000..4b62050
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import com.google.common.base.Preconditions;
+import org.apache.mahout.cf.taste.common.Refreshable;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericDataModel;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.SamplingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>An {@link DataModel} which reads into memory any of the KDD Cup's rating files; it is really
+ * meant for use with training data in the files trainIdx{1,2}}.txt.
+ * See http://kddcup.yahoo.com/.</p>
+ *
+ * <p>Timestamps in the data set are relative to some unknown point in time, for anonymity. They are assumed
+ * to be relative to the epoch, time 0, or January 1 1970, for purposes here.</p>
+ */
+public final class KDDCupDataModel implements DataModel {
+
+  private static final Logger log = LoggerFactory.getLogger(KDDCupDataModel.class);
+
+  private final File dataFileDirectory;
+  private final DataModel delegate;
+
+  /**
+   * @param dataFile training rating file
+   */
+  public KDDCupDataModel(File dataFile) throws IOException {
+    this(dataFile, false, 1.0);
+  }
+
+  /**
+   * @param dataFile training rating file
+   * @param storeDates if true, dates are parsed and stored, otherwise not
+   * @param samplingRate percentage of users to keep; can be used to reduce memory requirements
+   */
+  public KDDCupDataModel(File dataFile, boolean storeDates, double samplingRate) throws IOException {
+
+    Preconditions.checkArgument(!Double.isNaN(samplingRate) && samplingRate > 0.0 && samplingRate <= 1.0,
+        "Must be: 0.0 < samplingRate <= 1.0");
+
+    dataFileDirectory = dataFile.getParentFile();
+
+    Iterator<Pair<PreferenceArray,long[]>> dataIterator = new DataFileIterator(dataFile);
+    if (samplingRate < 1.0) {
+      dataIterator = new SamplingIterator<>(dataIterator, samplingRate);
+    }
+
+    FastByIDMap<PreferenceArray> userData = new FastByIDMap<>();
+    FastByIDMap<FastByIDMap<Long>> timestamps = new FastByIDMap<>();
+
+    while (dataIterator.hasNext()) {
+
+      Pair<PreferenceArray,long[]> pair = dataIterator.next();
+      PreferenceArray userPrefs = pair.getFirst();
+      long[] timestampsForPrefs = pair.getSecond();
+
+      userData.put(userPrefs.getUserID(0), userPrefs);
+      if (storeDates) {
+        FastByIDMap<Long> itemTimestamps = new FastByIDMap<>();
+        for (int i = 0; i < timestampsForPrefs.length; i++) {
+          long timestamp = timestampsForPrefs[i];
+          if (timestamp > 0L) {
+            itemTimestamps.put(userPrefs.getItemID(i), timestamp);
+          }
+        }
+      }
+
+    }
+
+    if (storeDates) {
+      delegate = new GenericDataModel(userData, timestamps);
+    } else {
+      delegate = new GenericDataModel(userData);
+    }
+
+    Runtime runtime = Runtime.getRuntime();
+    log.info("Loaded data model in about {}MB heap", (runtime.totalMemory() - runtime.freeMemory()) / 1000000);
+  }
+
+  public File getDataFileDirectory() {
+    return dataFileDirectory;
+  }
+
+  public static File getTrainingFile(File dataFileDirectory) {
+    return getFile(dataFileDirectory, "trainIdx");
+  }
+
+  public static File getValidationFile(File dataFileDirectory) {
+    return getFile(dataFileDirectory, "validationIdx");
+  }
+
+  public static File getTestFile(File dataFileDirectory) {
+    return getFile(dataFileDirectory, "testIdx");
+  }
+
+  public static File getTrackFile(File dataFileDirectory) {
+    return getFile(dataFileDirectory, "trackData");
+  }
+
+  private static File getFile(File dataFileDirectory, String prefix) {
+    // Works on set 1 or 2
+    for (int set : new int[] {1,2}) {
+      // Works on sample data from before contest or real data
+      for (String firstLinesOrNot : new String[] {"", ".firstLines"}) {
+        for (String gzippedOrNot : new String[] {".gz", ""}) {
+          File dataFile = new File(dataFileDirectory, prefix + set + firstLinesOrNot + ".txt" + gzippedOrNot);
+          if (dataFile.exists()) {
+            return dataFile;
+          }
+        }
+      }
+    }
+    throw new IllegalArgumentException("Can't find " + prefix + " file in " + dataFileDirectory);
+  }
+
+  @Override
+  public LongPrimitiveIterator getUserIDs() throws TasteException {
+    return delegate.getUserIDs();
+  }
+
+  @Override
+  public PreferenceArray getPreferencesFromUser(long userID) throws TasteException {
+    return delegate.getPreferencesFromUser(userID);
+  }
+
+  @Override
+  public FastIDSet getItemIDsFromUser(long userID) throws TasteException {
+    return delegate.getItemIDsFromUser(userID);
+  }
+
+  @Override
+  public LongPrimitiveIterator getItemIDs() throws TasteException {
+    return delegate.getItemIDs();
+  }
+
+  @Override
+  public PreferenceArray getPreferencesForItem(long itemID) throws TasteException {
+    return delegate.getPreferencesForItem(itemID);
+  }
+
+  @Override
+  public Float getPreferenceValue(long userID, long itemID) throws TasteException {
+    return delegate.getPreferenceValue(userID, itemID);
+  }
+
+  @Override
+  public Long getPreferenceTime(long userID, long itemID) throws TasteException {
+    return delegate.getPreferenceTime(userID, itemID);
+  }
+
+  @Override
+  public int getNumItems() throws TasteException {
+    return delegate.getNumItems();
+  }
+
+  @Override
+  public int getNumUsers() throws TasteException {
+    return delegate.getNumUsers();
+  }
+
+  @Override
+  public int getNumUsersWithPreferenceFor(long itemID) throws TasteException {
+    return delegate.getNumUsersWithPreferenceFor(itemID);
+  }
+
+  @Override
+  public int getNumUsersWithPreferenceFor(long itemID1, long itemID2) throws TasteException {
+    return delegate.getNumUsersWithPreferenceFor(itemID1, itemID2);
+  }
+
+  @Override
+  public void setPreference(long userID, long itemID, float value) throws TasteException {
+    delegate.setPreference(userID, itemID, value);
+  }
+
+  @Override
+  public void removePreference(long userID, long itemID) throws TasteException {
+    delegate.removePreference(userID, itemID);
+  }
+
+  @Override
+  public boolean hasPreferenceValues() {
+    return delegate.hasPreferenceValues();
+  }
+
+  @Override
+  public float getMaxPreference() {
+    return 100.0f;
+  }
+
+  @Override
+  public float getMinPreference() {
+    return 0.0f;
+  }
+
+  @Override
+  public void refresh(Collection<Refreshable> alreadyRefreshed) {
+    // do nothing
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java
new file mode 100644
index 0000000..3f4a732
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup;
+
+import org.apache.commons.io.Charsets;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * <p>This class converts a KDD Cup input file into a compressed CSV format. The output format is
+ * {@code userID,itemID,score,timestamp}. It can optionally restrict its output to exclude
+ * score and/or timestamp.</p>
+ *
+ * <p>Run as: {@code ToCSV (input file) (output file) [num columns to output]}</p>
+ */
+public final class ToCSV {
+
+  private ToCSV() {
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    File inputFile = new File(args[0]);
+    File outputFile = new File(args[1]);
+    int columnsToOutput = 4;
+    if (args.length >= 3) {
+      columnsToOutput = Integer.parseInt(args[2]);
+    }
+
+    OutputStream outStream = new GZIPOutputStream(new FileOutputStream(outputFile));
+
+    try (Writer outWriter = new BufferedWriter(new OutputStreamWriter(outStream, Charsets.UTF_8))){
+      for (Pair<PreferenceArray,long[]> user : new DataFileIterable(inputFile)) {
+        PreferenceArray prefs = user.getFirst();
+        long[] timestamps = user.getSecond();
+        for (int i = 0; i < prefs.length(); i++) {
+          outWriter.write(String.valueOf(prefs.getUserID(i)));
+          outWriter.write(',');
+          outWriter.write(String.valueOf(prefs.getItemID(i)));
+          if (columnsToOutput > 2) {
+            outWriter.write(',');
+            outWriter.write(String.valueOf(prefs.getValue(i)));
+          }
+          if (columnsToOutput > 3) {
+            outWriter.write(',');
+            outWriter.write(String.valueOf(timestamps[i]));
+          }
+          outWriter.write('\n');
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java
new file mode 100644
index 0000000..0112ab9
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class EstimateConverter {
+
+  private static final Logger log = LoggerFactory.getLogger(EstimateConverter.class);
+
+  private EstimateConverter() {}
+
+  public static byte convert(double estimate, long userID, long itemID) {
+    if (Double.isNaN(estimate)) {
+      log.warn("Unable to compute estimate for user {}, item {}", userID, itemID);
+      return 0x7F;
+    } else {
+      int scaledEstimate = (int) (estimate * 2.55);
+      if (scaledEstimate > 255) {
+        scaledEstimate = 255;
+      } else if (scaledEstimate < 0) {
+        scaledEstimate = 0;
+      }
+      return (byte) scaledEstimate;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java
new file mode 100644
index 0000000..72056da
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mahout.cf.taste.common.NoSuchItemException;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class Track1Callable implements Callable<byte[]> {
+
+  private static final Logger log = LoggerFactory.getLogger(Track1Callable.class);
+  private static final AtomicInteger COUNT = new AtomicInteger();
+
+  private final Recommender recommender;
+  private final PreferenceArray userTest;
+
+  Track1Callable(Recommender recommender, PreferenceArray userTest) {
+    this.recommender = recommender;
+    this.userTest = userTest;
+  }
+
+  @Override
+  public byte[] call() throws TasteException {
+    long userID = userTest.get(0).getUserID();
+    byte[] result = new byte[userTest.length()];
+    for (int i = 0; i < userTest.length(); i++) {
+      long itemID = userTest.getItemID(i);
+      double estimate;
+      try {
+        estimate = recommender.estimatePreference(userID, itemID);
+      } catch (NoSuchItemException nsie) {
+        // OK in the sample data provided before the contest, should never happen otherwise
+        log.warn("Unknown item {}; OK unless this is the real contest data", itemID);
+        continue;
+      }
+      result[i] = EstimateConverter.convert(estimate, userID, itemID);
+    }
+
+    if (COUNT.incrementAndGet() % 10000 == 0) {
+      log.info("Completed {} users", COUNT.get());
+    }
+
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java
new file mode 100644
index 0000000..067daf5
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.mahout.cf.taste.common.Refreshable;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender;
+import org.apache.mahout.cf.taste.impl.similarity.UncenteredCosineSimilarity;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.recommender.IDRescorer;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.cf.taste.similarity.ItemSimilarity;
+
+public final class Track1Recommender implements Recommender {
+
+  private final Recommender recommender;
+
+  public Track1Recommender(DataModel dataModel) throws TasteException {
+    // Change this to whatever you like!
+    ItemSimilarity similarity = new UncenteredCosineSimilarity(dataModel);
+    recommender = new GenericItemBasedRecommender(dataModel, similarity);
+  }
+  
+  @Override
+  public List<RecommendedItem> recommend(long userID, int howMany) throws TasteException {
+    return recommender.recommend(userID, howMany);
+  }
+
+  @Override
+  public List<RecommendedItem> recommend(long userID, int howMany, boolean includeKnownItems) throws TasteException {
+    return recommend(userID, howMany, null, includeKnownItems);
+  }
+
+  @Override
+  public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer) throws TasteException {
+    return recommender.recommend(userID, howMany, rescorer, false);
+  }
+  
+  @Override
+  public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer, boolean includeKnownItems)
+    throws TasteException {
+    return recommender.recommend(userID, howMany, rescorer, includeKnownItems);
+  }
+  
+  @Override
+  public float estimatePreference(long userID, long itemID) throws TasteException {
+    return recommender.estimatePreference(userID, itemID);
+  }
+  
+  @Override
+  public void setPreference(long userID, long itemID, float value) throws TasteException {
+    recommender.setPreference(userID, itemID, value);
+  }
+  
+  @Override
+  public void removePreference(long userID, long itemID) throws TasteException {
+    recommender.removePreference(userID, itemID);
+  }
+  
+  @Override
+  public DataModel getDataModel() {
+    return recommender.getDataModel();
+  }
+  
+  @Override
+  public void refresh(Collection<Refreshable> alreadyRefreshed) {
+    recommender.refresh(alreadyRefreshed);
+  }
+  
+  @Override
+  public String toString() {
+    return "Track1Recommender[recommender:" + recommender + ']';
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java
new file mode 100644
index 0000000..6b9fe1b
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+
+final class Track1RecommenderBuilder implements RecommenderBuilder {
+  
+  @Override
+  public Recommender buildRecommender(DataModel dataModel) throws TasteException {
+    return new Track1Recommender(dataModel);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java
new file mode 100644
index 0000000..bcd0a3d
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.eval.DataModelBuilder;
+import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
+import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable;
+import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverageAndStdDev;
+import org.apache.mahout.cf.taste.impl.common.RunningAverage;
+import org.apache.mahout.cf.taste.impl.common.RunningAverageAndStdDev;
+import org.apache.mahout.cf.taste.impl.eval.AbstractDifferenceRecommenderEvaluator;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Attempts to run an evaluation just like that dictated for Yahoo's KDD Cup, Track 1.
+ * It will compute the RMSE of a validation data set against the predicted ratings from
+ * the training data set.
+ */
+public final class Track1RecommenderEvaluator extends AbstractDifferenceRecommenderEvaluator {
+
+  private static final Logger log = LoggerFactory.getLogger(Track1RecommenderEvaluator.class);
+
+  private RunningAverage average;
+  private final File dataFileDirectory;
+
+  public Track1RecommenderEvaluator(File dataFileDirectory) {
+    setMaxPreference(100.0f);
+    setMinPreference(0.0f);
+    average = new FullRunningAverage();
+    this.dataFileDirectory = dataFileDirectory;
+  }
+
+  @Override
+  public double evaluate(RecommenderBuilder recommenderBuilder,
+                         DataModelBuilder dataModelBuilder,
+                         DataModel dataModel,
+                         double trainingPercentage,
+                         double evaluationPercentage) throws TasteException {
+
+    Recommender recommender = recommenderBuilder.buildRecommender(dataModel);
+
+    Collection<Callable<Void>> estimateCallables = Lists.newArrayList();
+    AtomicInteger noEstimateCounter = new AtomicInteger();
+    for (Pair<PreferenceArray,long[]> userData
+        : new DataFileIterable(KDDCupDataModel.getValidationFile(dataFileDirectory))) {
+      PreferenceArray validationPrefs = userData.getFirst();
+      long userID = validationPrefs.get(0).getUserID();
+      estimateCallables.add(
+          new PreferenceEstimateCallable(recommender, userID, validationPrefs, noEstimateCounter));
+    }
+
+    RunningAverageAndStdDev timing = new FullRunningAverageAndStdDev();
+    execute(estimateCallables, noEstimateCounter, timing);
+
+    double result = computeFinalEvaluation();
+    log.info("Evaluation result: {}", result);
+    return result;
+  }
+
+  // Use RMSE scoring:
+
+  @Override
+  protected void reset() {
+    average = new FullRunningAverage();
+  }
+
+  @Override
+  protected void processOneEstimate(float estimatedPreference, Preference realPref) {
+    double diff = realPref.getValue() - estimatedPreference;
+    average.addDatum(diff * diff);
+  }
+
+  @Override
+  protected double computeFinalEvaluation() {
+    return Math.sqrt(average.getAverage());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java
new file mode 100644
index 0000000..deadc00
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.cli2.OptionException;
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.example.TasteOptionParser;
+import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Track1RecommenderEvaluatorRunner {
+
+  private static final Logger log = LoggerFactory.getLogger(Track1RecommenderEvaluatorRunner.class);
+
+  private Track1RecommenderEvaluatorRunner() {
+  }
+  
+  public static void main(String... args) throws IOException, TasteException, OptionException {
+    File dataFileDirectory = TasteOptionParser.getRatings(args);
+    if (dataFileDirectory == null) {
+      throw new IllegalArgumentException("No data directory");
+    }
+    if (!dataFileDirectory.exists() || !dataFileDirectory.isDirectory()) {
+      throw new IllegalArgumentException("Bad data file directory: " + dataFileDirectory);
+    }
+    Track1RecommenderEvaluator evaluator = new Track1RecommenderEvaluator(dataFileDirectory);
+    DataModel model = new KDDCupDataModel(KDDCupDataModel.getTrainingFile(dataFileDirectory));
+    double evaluation = evaluator.evaluate(new Track1RecommenderBuilder(),
+      null,
+      model,
+      Float.NaN,
+      Float.NaN);
+    log.info(String.valueOf(evaluation));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java
new file mode 100644
index 0000000..a0ff126
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1;
+
+import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable;
+import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * <p>Runs "track 1" of the KDD Cup competition using whatever recommender is inside {@link Track1Recommender}
+ * and attempts to output the result in the correct contest format.</p>
+ *
+ * <p>Run as: {@code Track1Runner [track 1 data file directory] [output file]}</p>
+ */
+public final class Track1Runner {
+
+  private static final Logger log = LoggerFactory.getLogger(Track1Runner.class);
+
+  private Track1Runner() {
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    File dataFileDirectory = new File(args[0]);
+    if (!dataFileDirectory.exists() || !dataFileDirectory.isDirectory()) {
+      throw new IllegalArgumentException("Bad data file directory: " + dataFileDirectory);
+    }
+
+    long start = System.currentTimeMillis();
+
+    KDDCupDataModel model = new KDDCupDataModel(KDDCupDataModel.getTrainingFile(dataFileDirectory));
+    Track1Recommender recommender = new Track1Recommender(model);
+
+    long end = System.currentTimeMillis();
+    log.info("Loaded model in {}s", (end - start) / 1000);
+    start = end;
+
+    Collection<Track1Callable> callables = new ArrayList<>();
+    for (Pair<PreferenceArray,long[]> tests : new DataFileIterable(KDDCupDataModel.getTestFile(dataFileDirectory))) {
+      PreferenceArray userTest = tests.getFirst();
+      callables.add(new Track1Callable(recommender, userTest));
+    }
+
+    int cores = Runtime.getRuntime().availableProcessors();
+    log.info("Running on {} cores", cores);
+    ExecutorService executor = Executors.newFixedThreadPool(cores);
+    List<Future<byte[]>> results = executor.invokeAll(callables);
+    executor.shutdown();
+
+    end = System.currentTimeMillis();
+    log.info("Ran recommendations in {}s", (end - start) / 1000);
+    start = end;
+
+    try (OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(args[1])))){
+      for (Future<byte[]> result : results) {
+        for (byte estimate : result.get()) {
+          out.write(estimate);
+        }
+      }
+    }
+
+    end = System.currentTimeMillis();
+    log.info("Wrote output in {}s", (end - start) / 1000);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java
new file mode 100644
index 0000000..022d78c
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1.svd;
+
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.model.GenericPreference;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.Preference;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * can be used to drop {@link DataModel}s into {@link ParallelArraysSGDFactorizer}
+ */
+public class DataModelFactorizablePreferences implements FactorizablePreferences {
+
+  private final FastIDSet userIDs;
+  private final FastIDSet itemIDs;
+
+  private final List<Preference> preferences;
+
+  private final float minPreference;
+  private final float maxPreference;
+
+  public DataModelFactorizablePreferences(DataModel dataModel) {
+
+    minPreference = dataModel.getMinPreference();
+    maxPreference = dataModel.getMaxPreference();
+
+    try {
+      userIDs = new FastIDSet(dataModel.getNumUsers());
+      itemIDs = new FastIDSet(dataModel.getNumItems());
+      preferences = new ArrayList<>();
+
+      LongPrimitiveIterator userIDsIterator = dataModel.getUserIDs();
+      while (userIDsIterator.hasNext()) {
+        long userID = userIDsIterator.nextLong();
+        userIDs.add(userID);
+        for (Preference preference : dataModel.getPreferencesFromUser(userID)) {
+          itemIDs.add(preference.getItemID());
+          preferences.add(new GenericPreference(userID, preference.getItemID(), preference.getValue()));
+        }
+      }
+    } catch (TasteException te) {
+      throw new IllegalStateException("Unable to create factorizable preferences!", te);
+    }
+  }
+
+  @Override
+  public LongPrimitiveIterator getUserIDs() {
+    return userIDs.iterator();
+  }
+
+  @Override
+  public LongPrimitiveIterator getItemIDs() {
+    return itemIDs.iterator();
+  }
+
+  @Override
+  public Iterable<Preference> getPreferences() {
+    return preferences;
+  }
+
+  @Override
+  public float getMinPreference() {
+    return minPreference;
+  }
+
+  @Override
+  public float getMaxPreference() {
+    return maxPreference;
+  }
+
+  @Override
+  public int numUsers() {
+    return userIDs.size();
+  }
+
+  @Override
+  public int numItems() {
+    return itemIDs.size();
+  }
+
+  @Override
+  public int numPreferences() {
+    return preferences.size();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java
new file mode 100644
index 0000000..a126dec
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1.svd;
+
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.model.Preference;
+
+/**
+ * models the necessary input for {@link ParallelArraysSGDFactorizer}
+ */
+public interface FactorizablePreferences {
+
+  LongPrimitiveIterator getUserIDs();
+
+  LongPrimitiveIterator getItemIDs();
+
+  Iterable<Preference> getPreferences();
+
+  float getMinPreference();
+
+  float getMaxPreference();
+
+  int numUsers();
+
+  int numItems();
+
+  int numPreferences();
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/02f75f99/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java
new file mode 100644
index 0000000..6dcef6b
--- /dev/null
+++ b/community/mahout-mr/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.example.kddcup.track1.svd;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable;
+import org.apache.mahout.cf.taste.impl.common.AbstractLongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.Pair;
+
+import java.io.File;
+
+public class KDDCupFactorizablePreferences implements FactorizablePreferences {
+
+  private final File dataFile;
+
+  public KDDCupFactorizablePreferences(File dataFile) {
+    this.dataFile = dataFile;
+  }
+
+  @Override
+  public LongPrimitiveIterator getUserIDs() {
+    return new FixedSizeLongIterator(numUsers());
+  }
+
+  @Override
+  public LongPrimitiveIterator getItemIDs() {
+    return new FixedSizeLongIterator(numItems());
+  }
+
+  @Override
+  public Iterable<Preference> getPreferences() {
+    Iterable<Iterable<Preference>> prefIterators =
+        Iterables.transform(new DataFileIterable(dataFile),
+          new Function<Pair<PreferenceArray,long[]>,Iterable<Preference>>() {
+            @Override
+            public Iterable<Preference> apply(Pair<PreferenceArray,long[]> from) {
+              return from.getFirst();
+            }
+          });
+    return Iterables.concat(prefIterators);
+  }
+
+  @Override
+  public float getMinPreference() {
+    return 0;
+  }
+
+  @Override
+  public float getMaxPreference() {
+    return 100;
+  }
+
+  @Override
+  public int numUsers() {
+    return 1000990;
+  }
+
+  @Override
+  public int numItems() {
+    return 624961;
+  }
+
+  @Override
+  public int numPreferences() {
+    return 252800275;
+  }
+
+  static class FixedSizeLongIterator extends AbstractLongPrimitiveIterator {
+
+    private long currentValue;
+    private final long maximum;
+
+    FixedSizeLongIterator(long maximum) {
+      this.maximum = maximum;
+      currentValue = 0;
+    }
+
+    @Override
+    public long nextLong() {
+      return currentValue++;
+    }
+
+    @Override
+    public long peek() {
+      return currentValue;
+    }
+
+    @Override
+    public void skip(int n) {
+      currentValue += n;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currentValue < maximum;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+}