You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2009/12/07 21:24:11 UTC

svn commit: r888122 - in /lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop: ./ item/ pseudo/

Author: srowen
Date: Mon Dec  7 20:24:07 2009
New Revision: 888122

URL: http://svn.apache.org/viewvc?rev=888122&view=rev
Log:
Simplified input to pseudo-distributed job, fixes, added compression and dropping some cooccurrence data

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
      - copied, changed from r887673, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
      - copied, changed from r887676, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java Mon Dec  7 20:24:07 2009
@@ -29,6 +29,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -148,6 +150,7 @@
     jobConf.setClass("mapred.reducer.class", reducer, Reducer.class);
     jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class);
     jobConf.setClass("mapred.output.value.class", reducerValue, Writable.class);
+    jobConf.setBoolean("mapred.output.compress", true);
 
     jobConf.setClass("mapred.output.format.class", outputFormat, OutputFormat.class);
     jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Mon Dec  7 20:24:07 2009
@@ -84,7 +84,7 @@
 
   @Override
   public String toString() {
-    StringBuilder result = new StringBuilder(100);
+    StringBuilder result = new StringBuilder(200);
     result.append('[');
     boolean first = true;
     for (RecommendedItem item : recommended) {
@@ -95,7 +95,12 @@
       }
       result.append(item.getItemID());
       result.append(':');
-      result.append(item.getValue());
+      String valueString = String.valueOf(item.getValue());
+      // Is this rounding too crude?
+      if (valueString.length() > 6) {
+        valueString = valueString.substring(0, 6);
+      }
+      result.append(valueString);
     }
     result.append(']');
     return result.toString();

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java Mon Dec  7 20:24:07 2009
@@ -21,6 +21,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -52,6 +53,13 @@
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
 
+  private static final PathFilter IGNORABLE_FILES_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return !path.getName().startsWith("_logs");
+    }
+  };
+
   private FileSystem fs;
   private Path cooccurrencePath;
   private int recommendationsPerUser;
@@ -72,7 +80,7 @@
       IntWritable index = new IntWritable();
       LongWritable itemID = new LongWritable();
       Configuration conf = new Configuration();
-      for (FileStatus status : fs.listStatus(itemIDIndexPath)) {
+      for (FileStatus status : fs.listStatus(itemIDIndexPath, IGNORABLE_FILES_FILTER)) {
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
         while (reader.next(index, itemID)) {
           indexItemIDMap.put(index.get(), itemID.get());
@@ -94,7 +102,7 @@
     Configuration conf = new Configuration();
     Queue<RecommendedItem> topItems =
         new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1, Collections.reverseOrder());
-    for (FileStatus status : fs.listStatus(cooccurrencePath)) {
+    for (FileStatus status : fs.listStatus(cooccurrencePath, IGNORABLE_FILES_FILTER)) {
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
       while (reader.next(indexWritable, cooccurrenceVector)) {
         Long itemID = indexItemIDMap.get(indexWritable.get());

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Mon Dec  7 20:24:07 2009
@@ -28,31 +28,63 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Queue;
 
 public final class UserVectorToCooccurrenceMapper
     extends MapReduceBase
     implements Mapper<LongWritable, SparseVector, IntWritable, IntWritable> {
 
+  private static final int MAX_PREFS_CONSIDERED = 50;
+
   @Override
   public void map(LongWritable userID,
                   SparseVector userVector,
                   OutputCollector<IntWritable, IntWritable> output,
                   Reporter reporter) throws IOException {
 
+    double cutoff = userVector.size() <= MAX_PREFS_CONSIDERED ?
+        Double.NEGATIVE_INFINITY : findTopNPrefsCutoff(MAX_PREFS_CONSIDERED, userVector);
+
     Iterator<Vector.Element> it = userVector.iterateNonZero();
+
     while (it.hasNext()) {
-      int index1 = it.next().index();
-      Iterator<Vector.Element> it2 = userVector.iterateNonZero();
-      IntWritable itemWritable1 = new IntWritable(index1);
-      while (it2.hasNext()) {
-        int index2 = it2.next().index();
-        if (index1 != index2) {
-          output.collect(itemWritable1, new IntWritable(index2));
+      Vector.Element next1 = it.next();
+      if (next1.get() >= cutoff) {
+
+        int index1 = next1.index();
+        Iterator<Vector.Element> it2 = userVector.iterateNonZero();
+        IntWritable itemWritable1 = new IntWritable(index1);
+
+        while (it2.hasNext()) {
+          Vector.Element next2 = it2.next();
+          if (next2.get() >= cutoff) {
+
+            int index2 = next2.index();
+            if (index1 != index2) {
+              output.collect(itemWritable1, new IntWritable(index2));
+            }
+
+          }
         }
-      }
 
+      }
     }
+  }
 
+  private static double findTopNPrefsCutoff(int n, Vector userVector) {
+    Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    while (it.hasNext()) {
+      double prefValue = it.next().get();
+      if (topPrefValues.size() < n) {
+        topPrefValues.add(prefValue);
+      } else if (prefValue > topPrefValues.peek()) {
+        topPrefValues.add(prefValue);
+        topPrefValues.poll();
+      }
+    }
+    return topPrefValues.peek();
   }
 
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Mon Dec  7 20:24:07 2009
@@ -19,11 +19,11 @@
 
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.AbstractJob;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
@@ -37,7 +37,7 @@
  * It merely runs many {@link Recommender} instances on Hadoop, where each instance
  * is a normal non-distributed implementation.</p>
  *
- * <p>This class configures and runs a {@link RecommenderMapper} using Hadoop.</p>
+ * <p>This class configures and runs a {@link RecommenderReducer} using Hadoop.</p>
  *
  * <p>Command line arguments are:</p>
  *
@@ -46,9 +46,7 @@
  *   recommendations. Note that it must have a constructor which takes a
  *   {@link org.apache.mahout.cf.taste.model.DataModel} argument.</li>
  *  <li>numRecommendations: Number of recommendations to compute per user</li>
- *  <li>input: Directory containing a text file containing user IDs
- *   for which recommendations should be computed, one per line</li>
- *  <li>dataModelFile: Location of a data model file containing preference data,
+ *  <li>input: Location of a data model file containing preference data,
  *   suitable for use with {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
  *  <li>output: output path where recommender output should go</li>
  *  <li>jarFile: JAR file containing implementation code</li>
@@ -81,9 +79,9 @@
  * <p>And launch:</p>
  *
  * {@code
- * hadoop jar recommender.jar org.apache.mahout.cf.taste.hadoop.RecommenderJob \
+ * hadoop jar recommender.jar org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob \
  *   --recommenderClassName your.project.Recommender \
- *   --numRecommendations 10 --input input/users.txt --dataModelFile input/input.csv \
+ *   --numRecommendations 10 --input input/users.csv \
  *   --output output --jarFile recommender.jar
  * }
  */
@@ -92,34 +90,34 @@
   @Override
   public int run(String[] args) throws IOException {
 
-    Option recommendClassOpt = buildOption("recommenderClassName", "r", "Name of recommender class to instantiate", true);
-    Option numReccomendationsOpt = buildOption("numRecommendations", "n", "Number of recommendations per user", true);
-    Option dataModelFileOpt = buildOption("dataModelFile", "m", "File containing preference data", true);
+    Option recommendClassOpt =
+        buildOption("recommenderClassName", "r", "Name of recommender class to instantiate", true);
+    Option numReccomendationsOpt =
+        buildOption("numRecommendations", "n", "Number of recommendations per user", true);
 
-    Map<String,Object> parsedArgs = parseArguments(args, recommendClassOpt, numReccomendationsOpt, dataModelFileOpt);
-    String userIDFile = parsedArgs.get("--input").toString();
+    Map<String,Object> parsedArgs = parseArguments(args, recommendClassOpt, numReccomendationsOpt);
+    String inputFile = parsedArgs.get("--input").toString();
     String outputPath = parsedArgs.get("--output").toString();
     String jarFile = parsedArgs.get("--jarFile").toString();
 
     String recommendClassName = parsedArgs.get("--recommenderClassName").toString();
     int recommendationsPerUser = Integer.parseInt((String) parsedArgs.get("--numRecommendations"));
-    String dataModelFile = parsedArgs.get("--dataModelFile").toString();
 
-    JobConf jobConf = prepareJobConf(userIDFile,
+    JobConf jobConf = prepareJobConf(inputFile,
                                      outputPath,
                                      jarFile,
                                      TextInputFormat.class,
-                                     RecommenderMapper.class,
+                                     UserIDsMapper.class,
                                      LongWritable.class,
-                                     RecommendedItemsWritable.class,
-                                     IdentityReducer.class,
+                                     NullWritable.class,
+                                     RecommenderReducer.class,
                                      LongWritable.class,
                                      RecommendedItemsWritable.class,
                                      TextOutputFormat.class);
 
-    jobConf.set(RecommenderMapper.RECOMMENDER_CLASS_NAME, recommendClassName);
-    jobConf.setInt(RecommenderMapper.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
-    jobConf.set(RecommenderMapper.DATA_MODEL_FILE, dataModelFile);
+    jobConf.set(RecommenderReducer.RECOMMENDER_CLASS_NAME, recommendClassName);
+    jobConf.setInt(RecommenderReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+    jobConf.set(RecommenderReducer.DATA_MODEL_FILE, inputFile);
 
     JobClient.runJob(jobConf);
     return 0;

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (from r887673, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java&r1=887673&r2=888122&rev=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Mon Dec  7 20:24:07 2009
@@ -20,11 +20,12 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.common.TasteException;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
@@ -41,19 +42,15 @@
 import java.util.List;
 
 /**
- * <p>The {@link Mapper} which takes as input a file of user IDs (treated as Strings, note), and for each unique user
+ * <p>The {@link Reducer} which takes as input the user IDs parsed out by the map phase, and for each unique user
  * ID, computes recommendations with the configured {@link Recommender}. The results are output as {@link
  * RecommendedItemsWritable}.</p>
  *
- * <p>Note that there is no corresponding {@link org.apache.hadoop.mapreduce.Reducer}; this implementation can only
- * partially take advantage of the mapreduce paradigm and only really leverages it for easy parallelization. Therefore,
- * use the {@link org.apache.hadoop.mapred.lib.IdentityReducer} when running this on Hadoop.</p>
- *
  * @see RecommenderJob
  */
-public final class RecommenderMapper
+public final class RecommenderReducer
     extends MapReduceBase
-    implements Mapper<LongWritable, Text, LongWritable, RecommendedItemsWritable> {
+    implements Reducer<LongWritable, NullWritable, LongWritable, RecommendedItemsWritable> {
 
   static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -97,13 +94,13 @@
     recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
   }
 
-
   @Override
-  public void map(LongWritable key,
-                  Text value,
-                  OutputCollector<LongWritable, RecommendedItemsWritable> output,
-                  Reporter reporter) throws IOException {
-    long userID = Long.parseLong(value.toString());
+  public void reduce(LongWritable key,
+                     Iterator<NullWritable> values,
+                     OutputCollector<LongWritable, RecommendedItemsWritable> output,
+                     Reporter reporter)
+      throws IOException {
+    long userID = key.get();
     List<RecommendedItem> recommendedItems;
     try {
       recommendedItems = recommender.recommend(userID, recommendationsPerUser);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java Mon Dec  7 20:24:07 2009
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.cf.taste.hadoop.pseudo;
 
-/** Custom metrics collected by {@link RecommenderMapper}. */
+/** Custom metrics collected by {@link RecommenderReducer}. */
 public enum ReducerMetrics {
 
   /** Number of unique users for which recommendations were produced */

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java (from r887676, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java&r1=887676&r2=888122&rev=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java Mon Dec  7 20:24:07 2009
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop.item;
+package org.apache.mahout.cf.taste.hadoop.pseudo;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
@@ -26,27 +26,22 @@
 import org.apache.hadoop.mapred.Reporter;
 
 import java.io.IOException;
-import java.util.regex.Pattern;
 
-public final class ItemIDIndexMapper
+/**
+ * Extracts and emits all user IDs from the input file.
+ */
+public final class UserIDsMapper
     extends MapReduceBase
-    implements Mapper<LongWritable, Text, IntWritable, LongWritable> {
-
-  private static final Pattern COMMA = Pattern.compile(",");
+    implements Mapper<LongWritable, Text, LongWritable, NullWritable> {
 
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<IntWritable, LongWritable> output,
+                  OutputCollector<LongWritable, NullWritable> output,
                   Reporter reporter) throws IOException {
-    String[] tokens = COMMA.split(value.toString());
-    long itemID = Long.parseLong(tokens[1]);
-    int index = itemIDToIndex(itemID);
-    output.collect(new IntWritable(index), new LongWritable(itemID));
-  }
-
-  static int itemIDToIndex(long itemID) {
-    return (int) (itemID) ^ (int) (itemID >>> 32);
+    String line = value.toString();
+    long userID = Long.parseLong(line.substring(0, line.indexOf(',')));
+    output.collect(new LongWritable(userID), NullWritable.get());
   }
 
 }
\ No newline at end of file