You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/05/06 19:55:15 UTC

svn commit: r941830 - in /lucene/mahout/trunk/core/src: main/java/org/apache/mahout/cf/taste/hadoop/similarity/ main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ test/java/org/apache/mahout/cf/taste/hadoop/similarity/ test/java/org/apache/ma...

Author: srowen
Date: Thu May  6 17:55:15 2010
New Revision: 941830

URL: http://svn.apache.org/viewvc?rev=941830&view=rev
Log:
MAHOUT-389

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/CoRating.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarity.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightArrayWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarityTest.java
Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CosineSimilarityReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthArrayWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/CoRating.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/CoRating.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/CoRating.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/CoRating.java Thu May  6 17:55:15 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.RandomUtils;
+
+/**
+ * modelling a pair of user ratings for an item
+ */
+public final class CoRating implements Writable {
+
+  private float prefValueX;
+  private float prefValueY;
+
+  public CoRating() {
+  }
+
+  public CoRating(float prefValueX, float prefValueY) {
+    this.prefValueX = prefValueX;
+    this.prefValueY = prefValueY;
+  }
+
+  public float getPrefValueX() {
+    return prefValueX;
+  }
+
+  public float getPrefValueY() {
+    return prefValueY;
+  }
+
+  @Override
+  public int hashCode() {
+    return RandomUtils.hashFloat(prefValueX) + 31 * RandomUtils.hashFloat(prefValueY);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof CoRating) {
+      CoRating other = (CoRating) obj;
+      return (prefValueX == other.prefValueX && prefValueY == other.prefValueY);
+    }
+    return false;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    prefValueX = in.readFloat();
+    prefValueY = in.readFloat();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(prefValueX);
+    out.writeFloat(prefValueY);
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java Thu May  6 17:55:15 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity;
+
+import java.util.Iterator;
+
+public class DistributedPearsonCorrelationSimilarity implements DistributedSimilarity {
+
+  @Override
+  public double similarity(Iterator<CoRating> coRatings, double weightOfItemVectorX, double weightOfItemVectorY) {
+
+    int count = 0;
+    double sumX = 0.0;
+    double sumY = 0.0;
+    double sumXY = 0.0;
+    double sumX2 = 0.0;
+    double sumY2 = 0.0;
+
+    while (coRatings.hasNext()) {
+      CoRating coRating = coRatings.next();
+      double x = coRating.getPrefValueX();
+      double y = coRating.getPrefValueY();
+
+      sumXY += x * y;
+      sumX += x;
+      sumX2 += x * x;
+      sumY += y;
+      sumY2 += y * y;
+      count++;
+    }
+
+    if (sumXY == 0.0) {
+      return Double.NaN;
+    }
+
+    // "Center" the data. If my math is correct, this'll do it.
+    double n = count;
+    double meanX = sumX / n;
+    double meanY = sumY / n;
+    // double centeredSumXY = sumXY - meanY * sumX - meanX * sumY + n * meanX * meanY;
+    double centeredSumXY = sumXY - meanY * sumX;
+    // double centeredSumX2 = sumX2 - 2.0 * meanX * sumX + n * meanX * meanX;
+    double centeredSumX2 = sumX2 - meanX * sumX;
+    // double centeredSumY2 = sumY2 - 2.0 * meanY * sumY + n * meanY * meanY;
+    double centeredSumY2 = sumY2 - meanY * sumY;
+
+    double denominator = Math.sqrt(centeredSumX2) * Math.sqrt(centeredSumY2);
+    if (denominator == 0.0) {
+      // One or both parties has -all- the same ratings;
+      // can't really say much similarity under this measure
+      return Double.NaN;
+    }
+
+    return centeredSumXY / denominator;
+  }
+
+  @Override
+  public double weightOfItemVector(Iterator<Float> prefValues) {
+    return Double.NaN;
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarity.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarity.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarity.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarity.java Thu May  6 17:55:15 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity;
+
+import java.util.Iterator;
+
+/**
+ * Modelling the pairwise similarity computation in a distributed manner
+ */
+public interface DistributedSimilarity {
+
+  /**
+   * compute the weight of an item vector (called in an early stage of the map-reduce steps)
+   *
+   * @param prefValues
+   * @return
+   */
+  double weightOfItemVector(Iterator<Float> prefValues);
+
+  /**
+   * compute the similarity for a pair of item-vectors
+   *
+   * @param coratings all coratings for these items
+   * @param weightOfItemVectorX the weight computed for the first vector
+   * @param weightOfItemVectorY the weight computed for the second vector
+   * @return
+   */
+  double similarity(Iterator<CoRating> coratings,
+                    double weightOfItemVectorX,
+                    double weightOfItemVectorY);
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java Thu May  6 17:55:15 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity;
+
+import java.util.Iterator;
+
+public final class DistributedUncenteredZeroAssumingCosineSimilarity
+    implements DistributedSimilarity {
+
+  @Override
+  public double similarity(Iterator<CoRating> coRatings, double weightOfItemVectorX, double weightOfItemVectorY) {
+
+    double sumXY = 0;
+    while (coRatings.hasNext()) {
+      CoRating coRating = coRatings.next();
+      sumXY += coRating.getPrefValueX() * coRating.getPrefValueY();
+    }
+
+    if (sumXY == 0) {
+      return Double.NaN;
+    }
+    return sumXY / (weightOfItemVectorX * weightOfItemVectorY);
+  }
+
+  @Override
+  public double weightOfItemVector(Iterator<Float> prefValues) {
+    double length = 0.0;
+    while (prefValues.hasNext()) {
+      float prefValue = prefValues.next();
+      if (!((Float)prefValue).isNaN()) {
+        length += prefValue * prefValue;
+      }
+    }
+
+    return Math.sqrt(length);
+  }
+
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java?rev=941830&r1=941829&r2=941830&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java Thu May  6 17:55:15 2010
@@ -19,40 +19,40 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
 
 /**
  * map out each pair of items that appears in the same user-vector together with the multiplied vector lengths
  * of the associated item vectors
  */
 public final class CopreferredItemsMapper extends MapReduceBase
-    implements Mapper<LongWritable,ItemPrefWithLengthArrayWritable,ItemPairWritable,FloatWritable> {
+    implements Mapper<LongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
 
   @Override
   public void map(LongWritable user,
-                  ItemPrefWithLengthArrayWritable itemPrefsArray,
-                  OutputCollector<ItemPairWritable,FloatWritable> output,
+                  ItemPrefWithItemVectorWeightArrayWritable itemPrefsArray,
+                  OutputCollector<ItemPairWritable, CoRating> output,
                   Reporter reporter)
       throws IOException {
 
-    ItemPrefWithLengthWritable[] itemPrefs = itemPrefsArray.getItemPrefs();
+    ItemPrefWithItemVectorWeightWritable[] itemPrefs = itemPrefsArray.getItemPrefs();
 
     for (int n = 0; n < itemPrefs.length; n++) {
-      ItemPrefWithLengthWritable itemN = itemPrefs[n];
+      ItemPrefWithItemVectorWeightWritable itemN = itemPrefs[n];
       long itemNID = itemN.getItemID();
-      double itemNLength = itemN.getLength();
+      double itemNWeight = itemN.getWeight();
       float itemNValue = itemN.getPrefValue();
       for (int m = n + 1; m < itemPrefs.length; m++) {
-        ItemPrefWithLengthWritable itemM = itemPrefs[m];
+        ItemPrefWithItemVectorWeightWritable itemM = itemPrefs[m];
         long itemAID = Math.min(itemNID, itemM.getItemID());
         long itemBID = Math.max(itemNID, itemM.getItemID());
-        ItemPairWritable pair = new ItemPairWritable(itemAID, itemBID, itemNLength * itemM.getLength());
-        output.collect(pair, new FloatWritable(itemNValue * itemM.getPrefValue()));
+        ItemPairWritable pair = new ItemPairWritable(itemAID, itemBID, itemNWeight, itemM.getWeight());
+        output.collect(pair, new CoRating(itemNValue, itemM.getPrefValue()));
       }
     }
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java?rev=941830&r1=941829&r2=941830&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java Thu May  6 17:55:15 2010
@@ -31,14 +31,16 @@ import org.apache.mahout.cf.taste.hadoop
 public final class ItemPairWritable implements WritableComparable<ItemPairWritable> {
 
   private EntityEntityWritable itemItemWritable;
-  private double multipliedLength;
+  private double itemAWeight;
+  private double itemBWeight;
 
   public ItemPairWritable() {
   }
 
-  public ItemPairWritable(long itemAID, long itemBID, double multipliedLength) {
+  public ItemPairWritable(long itemAID, long itemBID, double itemAWeight, double itemBWeight) {
     this.itemItemWritable = new EntityEntityWritable(itemAID, itemBID);
-    this.multipliedLength = multipliedLength;
+    this.itemAWeight = itemAWeight;
+    this.itemBWeight = itemBWeight;
   }
 
   public long getItemAID() {
@@ -53,21 +55,27 @@ public final class ItemPairWritable impl
     return itemItemWritable;
   }
 
-  public double getMultipliedLength() {
-    return multipliedLength;
+  public double getItemAWeight() {
+    return itemAWeight;
+  }
+
+  public double getItemBWeight() {
+    return itemBWeight;
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     itemItemWritable = new EntityEntityWritable();
     itemItemWritable.readFields(in);
-    multipliedLength = in.readDouble();
+    itemAWeight = in.readDouble();
+    itemBWeight = in.readDouble();
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     itemItemWritable.write(out);
-    out.writeDouble(multipliedLength);
+    out.writeDouble(itemAWeight);
+    out.writeDouble(itemBWeight);
   }
 
   @Override

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightArrayWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightArrayWritable.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightArrayWritable.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightArrayWritable.java Thu May  6 17:55:15 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+/**
+ * An {@link ArrayWritable} holding {@link ItemPrefWithItemVectorWeightWritable}s
+ *
+ * Used as user-vector
+ */
+public class ItemPrefWithItemVectorWeightArrayWritable extends ArrayWritable {
+
+  public ItemPrefWithItemVectorWeightArrayWritable() {
+    super(ItemPrefWithItemVectorWeightWritable.class);
+  }
+
+  public ItemPrefWithItemVectorWeightArrayWritable(ItemPrefWithItemVectorWeightWritable[] itemPrefs) {
+    super(ItemPrefWithItemVectorWeightWritable.class, itemPrefs);
+  }
+
+  public ItemPrefWithItemVectorWeightWritable[] getItemPrefs() {
+    return (ItemPrefWithItemVectorWeightWritable[]) toArray();
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightWritable.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightWritable.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithItemVectorWeightWritable.java Thu May  6 17:55:15 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.common.RandomUtils;
+
+/**
+ * A {@link Writable} encapsulating the preference for an item
+ * stored along with the length of the item-vector
+ *
+ */
+public final class ItemPrefWithItemVectorWeightWritable implements Writable, Cloneable {
+
+  private EntityPrefWritable itemPref;
+  private double weight;
+
+  public ItemPrefWithItemVectorWeightWritable() {
+  // do nothing
+  }
+
+  public ItemPrefWithItemVectorWeightWritable(long itemID, double weight, float prefValue) {
+    this.itemPref = new EntityPrefWritable(itemID, prefValue);
+    this.weight = weight;
+  }
+
+  public long getItemID() {
+    return itemPref.getID();
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  public float getPrefValue() {
+    return itemPref.getPrefValue();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    itemPref.write(out);
+    out.writeDouble(weight);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    itemPref = new EntityPrefWritable();
+    itemPref.readFields(in);
+    weight = in.readDouble();
+  }
+
+  @Override
+  public int hashCode() {
+    return itemPref.hashCode() + 31 * RandomUtils.hashDouble(weight);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ItemPrefWithItemVectorWeightWritable) {
+      ItemPrefWithItemVectorWeightWritable other = (ItemPrefWithItemVectorWeightWritable) o;
+      return itemPref.equals(other.itemPref) && weight == other.getWeight();
+    }
+    return false;
+  }
+
+  @Override
+  public ItemPrefWithItemVectorWeightWritable clone() {
+    return new ItemPrefWithItemVectorWeightWritable(itemPref.getID(), weight, itemPref.getPrefValue());
+  }
+
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=941830&r1=941829&r2=941830&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Thu May  6 17:55:15 2010
@@ -17,11 +17,12 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity.item;
 
+import java.io.IOException;
 import java.util.Map;
 
+import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -34,6 +35,8 @@ import org.apache.mahout.cf.taste.hadoop
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
 import org.apache.mahout.cf.taste.hadoop.ToUserPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
+import org.apache.mahout.cf.taste.hadoop.similarity.DistributedSimilarity;
 import org.apache.mahout.common.AbstractJob;
 
 /**
@@ -84,6 +87,8 @@ import org.apache.mahout.common.Abstract
  * the form userID,itemID,preference
  * computed, one per line</li>
  * <li>-Dmapred.output.dir=(path): output path where the computations output should go</li>
+ * <li>--similarityClassname (classname): an implemenation of {@link DistributedSimilarity} used to compute the
+ * similarity</li>
  * </ol>
  *
  *
@@ -95,15 +100,24 @@ import org.apache.mahout.common.Abstract
  */
 public final class ItemSimilarityJob extends AbstractJob {
 
+  public static final String DISTRIBUTED_SIMILARITY_CLASSNAME =
+    "org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob.distributedSimilarityClassname";
+
   @Override
-  public int run(String[] args) throws Exception {
+  public int run(String[] args) throws IOException {
+
+    Option similarityClassOpt = AbstractJob.buildOption("similarityClassname", "s",
+    "Name of distributed similarity class to instantiate");
 
-    Map<String,String> parsedArgs = AbstractJob.parseArguments(args);
+    Map<String,String> parsedArgs = AbstractJob.parseArguments(args, similarityClassOpt);
     if (parsedArgs == null) {
       return -1;
     }
 
     Configuration originalConf = getConf();
+
+    String distributedSimilarityClassname = parsedArgs.get("--similarityClassname");
+
     String inputPath = originalConf.get("mapred.input.dir");
     String outputPath = originalConf.get("mapred.output.dir");
     String tempDirPath = parsedArgs.get("--tempDir");
@@ -128,11 +142,13 @@ public final class ItemSimilarityJob ext
                                          SequenceFileInputFormat.class,
                                          PreferredItemsPerUserMapper.class,
                                          LongWritable.class,
-                                         ItemPrefWithLengthWritable.class,
+                                         ItemPrefWithItemVectorWeightWritable.class,
                                          PreferredItemsPerUserReducer.class,
                                          LongWritable.class,
-                                         ItemPrefWithLengthArrayWritable.class,
+                                         ItemPrefWithItemVectorWeightArrayWritable.class,
                                          SequenceFileOutputFormat.class);
+
+    userVectors.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname);
     JobClient.runJob(userVectors);
 
     JobConf similarity = prepareJobConf(userVectorsPath,
@@ -140,11 +156,13 @@ public final class ItemSimilarityJob ext
                                         SequenceFileInputFormat.class,
                                         CopreferredItemsMapper.class,
                                         ItemPairWritable.class,
-                                        FloatWritable.class,
-                                        CosineSimilarityReducer.class,
+                                        CoRating.class,
+                                        SimilarityReducer.class,
                                         EntityEntityWritable.class,
                                         DoubleWritable.class,
                                         TextOutputFormat.class);
+
+    similarity.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname);
     JobClient.runJob(similarity);
 
     return 0;
@@ -154,4 +172,16 @@ public final class ItemSimilarityJob ext
     ToolRunner.run(new ItemSimilarityJob(), args);
   }
 
+  static DistributedSimilarity instantiateSimilarity(String classname) {
+    try {
+      return (DistributedSimilarity) Class.forName(classname).newInstance();
+    } catch (ClassNotFoundException cnfe) {
+      throw new IllegalStateException(cnfe);
+    } catch (InstantiationException ie) {
+      throw new IllegalStateException(ie);
+    } catch (IllegalAccessException iae) {
+      throw new IllegalStateException(iae);
+    }
+  }
+
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java?rev=941830&r1=941829&r2=941830&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java Thu May  6 17:55:15 2010
@@ -18,44 +18,79 @@
 package org.apache.mahout.cf.taste.hadoop.similarity.item;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
+import org.apache.mahout.cf.taste.hadoop.similarity.DistributedSimilarity;
 
 /**
- * for each item-vector, we compute its length here and map out all entries with the user as key,
+ * for each item-vector, we compute its weight here and map out all entries with the user as key,
  * so we can create the user-vectors in the reducer
  */
 public final class PreferredItemsPerUserMapper extends MapReduceBase
-    implements Mapper<LongWritable,EntityPrefWritableArrayWritable,LongWritable,ItemPrefWithLengthWritable> {
+    implements Mapper<LongWritable,EntityPrefWritableArrayWritable,LongWritable,ItemPrefWithItemVectorWeightWritable> {
+
+  private DistributedSimilarity distributedSimilarity;
+
+  @Override
+  public void configure(JobConf jobConf) {
+    super.configure(jobConf);
+    distributedSimilarity =
+      ItemSimilarityJob.instantiateSimilarity(jobConf.get(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME));
+  }
 
   @Override
   public void map(LongWritable item,
                   EntityPrefWritableArrayWritable userPrefsArray,
-                  OutputCollector<LongWritable,ItemPrefWithLengthWritable> output,
+                  OutputCollector<LongWritable,ItemPrefWithItemVectorWeightWritable> output,
                   Reporter reporter) throws IOException {
 
     EntityPrefWritable[] userPrefs = userPrefsArray.getPrefs();
 
-    double length = 0.0;
+    double weight = distributedSimilarity.weightOfItemVector(new UserPrefsIterator(userPrefs));
+
     for (EntityPrefWritable userPref : userPrefs) {
-      double value = userPref.getPrefValue();
-      length += value * value;
+      output.collect(new LongWritable(userPref.getID()),
+          new ItemPrefWithItemVectorWeightWritable(item.get(), weight, userPref.getPrefValue()));
     }
+  }
 
-    length = Math.sqrt(length);
+  public static class UserPrefsIterator implements Iterator<Float> {
 
-    for (EntityPrefWritable userPref : userPrefs) {
-      output.collect(new LongWritable(userPref.getID()),
-          new ItemPrefWithLengthWritable(item.get(), length, userPref.getPrefValue()));
+    private int index;
+    private final EntityPrefWritable[] userPrefs;
+
+    public UserPrefsIterator(EntityPrefWritable[] userPrefs) {
+      this.userPrefs = userPrefs;
+      this.index = 0;
     }
 
-  }
+    @Override
+    public boolean hasNext() {
+      return (index < userPrefs.length);
+    }
+
+    @Override
+    public Float next() {
+      if (index >= userPrefs.length) {
+        throw new NoSuchElementException();
+      }
+      return userPrefs[index++].getPrefValue();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
 
+  }
 
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java?rev=941830&r1=941829&r2=941830&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java Thu May  6 17:55:15 2010
@@ -29,23 +29,25 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
 public final class PreferredItemsPerUserReducer extends MapReduceBase
-    implements Reducer<LongWritable,ItemPrefWithLengthWritable, LongWritable,ItemPrefWithLengthArrayWritable> {
+    implements Reducer<LongWritable,ItemPrefWithItemVectorWeightWritable, LongWritable,ItemPrefWithItemVectorWeightArrayWritable> {
 
   @Override
   public void reduce(LongWritable user,
-                     Iterator<ItemPrefWithLengthWritable> itemPrefs,
-                     OutputCollector<LongWritable,ItemPrefWithLengthArrayWritable> output,
+                     Iterator<ItemPrefWithItemVectorWeightWritable> itemPrefs,
+                     OutputCollector<LongWritable,ItemPrefWithItemVectorWeightArrayWritable> output,
                      Reporter reporter)
       throws IOException {
 
-    Set<ItemPrefWithLengthWritable> itemPrefsWithLength = new HashSet<ItemPrefWithLengthWritable>();
+    Set<ItemPrefWithItemVectorWeightWritable> itemPrefsWithItemVectorWeight
+        = new HashSet<ItemPrefWithItemVectorWeightWritable>();
 
     while (itemPrefs.hasNext()) {
-      itemPrefsWithLength.add(itemPrefs.next().clone());
+      itemPrefsWithItemVectorWeight.add(itemPrefs.next().clone());
     }
 
-    output.collect(user, new ItemPrefWithLengthArrayWritable(
-        itemPrefsWithLength.toArray(new ItemPrefWithLengthWritable[itemPrefsWithLength.size()])));
+    output.collect(user, new ItemPrefWithItemVectorWeightArrayWritable(
+        itemPrefsWithItemVectorWeight.toArray(
+        new ItemPrefWithItemVectorWeightWritable[itemPrefsWithItemVectorWeight.size()])));
   }
 
 

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java Thu May  6 17:55:15 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
+import org.apache.mahout.cf.taste.hadoop.similarity.DistributedSimilarity;
+
+/**
+ * Finally compute the similarity for each item-pair, that has been corated at least once
+ */
+public final class SimilarityReducer extends MapReduceBase
+    implements Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable> {
+
+  private DistributedSimilarity distributedSimilarity;
+
+  @Override
+  public void configure(JobConf jobConf) {
+    super.configure(jobConf);
+    distributedSimilarity =
+      ItemSimilarityJob.instantiateSimilarity(jobConf.get(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME));
+  }
+
+  @Override
+  public void reduce(ItemPairWritable pair,
+                     Iterator<CoRating> coRatings,
+                     OutputCollector<EntityEntityWritable,DoubleWritable> output,
+                     Reporter reporter)
+      throws IOException {
+
+    double similarity =
+      distributedSimilarity.similarity(coRatings, pair.getItemAWeight(), pair.getItemBWeight());
+
+    if (!Double.isNaN(similarity)) {
+      output.collect(pair.getItemItemWritable(), new DoubleWritable(similarity));
+    }
+  }
+
+}

Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarityTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarityTest.java?rev=941830&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarityTest.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedSimilarityTest.java Thu May  6 17:55:15 2010
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+
+public final class DistributedSimilarityTest extends TasteTestCase {
+
+  public void testUncenteredZeroAssumingCosine() throws Exception {
+
+    DistributedSimilarity similarity = new DistributedUncenteredZeroAssumingCosineSimilarity();
+
+    assertSimilar(similarity, new Float[] { Float.NaN, Float.NaN, Float.NaN, Float.NaN, 1.0f },
+        new Float[] { Float.NaN, 1.0f, 1.0f, 1.0f, 1.0f }, 0.5);
+
+    assertSimilar(similarity, new Float[] { Float.NaN, 1.0f }, new Float[] { 1.0f, Float.NaN }, Double.NaN);
+    assertSimilar(similarity, new Float[] { 1.0f, Float.NaN }, new Float[] { 1.0f, Float.NaN }, 1.0);
+  }
+
+  public void testPearsonCorrelation() throws Exception {
+
+    DistributedSimilarity similarity = new DistributedPearsonCorrelationSimilarity();
+
+    assertSimilar(similarity, new Float[] { 3.0f, -2.0f }, new Float[] { 3.0f, -2.0f }, 1.0);
+    assertSimilar(similarity, new Float[] { 3.0f, 3.0f }, new Float[] { 3.0f, 3.0f }, Double.NaN);
+    assertSimilar(similarity, new Float[] { Float.NaN, 3.0f }, new Float[] { 3.0f, Float.NaN }, Double.NaN);
+  }
+
+  private static void assertSimilar(DistributedSimilarity similarity,
+                                    Float[] prefsX,
+                                    Float[] prefsY,
+                                    double expectedSimilarity) {
+
+    double weightX = similarity.weightOfItemVector(Arrays.asList(prefsX).iterator());
+    double weightY = similarity.weightOfItemVector(Arrays.asList(prefsY).iterator());
+
+    List<CoRating> coRatings = new LinkedList<CoRating>();
+
+    for (int n = 0; n < prefsX.length; n++) {
+      Float x = prefsX[n];
+      Float y = prefsY[n];
+
+      if (!x.isNaN() && !y.isNaN()) {
+        coRatings.add(new CoRating(x, y));
+      }
+    }
+
+    double result = similarity.similarity(coRatings.iterator(), weightX, weightY);
+    assertEquals(expectedSimilarity, result, EPSILON);
+  }
+
+}

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java?rev=941830&r1=941829&r2=941830&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java Thu May  6 17:55:15 2010
@@ -30,26 +30,26 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.easymock.classextension.EasyMock;
-import org.easymock.IArgumentMatcher;
-
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable;
-import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.ToUserPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
+import org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity;
 import org.apache.mahout.common.MahoutTestCase;
+import org.easymock.IArgumentMatcher;
+import org.easymock.classextension.EasyMock;
 
 /**
  * Unit tests for the mappers and reducers in org.apache.mahout.cf.taste.hadoop.similarity
  * Integration test with a mini-file at the end
  *
  */
-public class ItemSimilarityTest extends MahoutTestCase {
-
+public final class ItemSimilarityTest extends MahoutTestCase {
 
   public void testUserPrefsPerItemMapper() throws Exception {
     OutputCollector<LongWritable,LongWritable> output =
@@ -112,35 +112,39 @@ public class ItemSimilarityTest extends 
   }
 
   public void testPreferredItemsPerUserMapper() throws Exception {
-    OutputCollector<LongWritable,ItemPrefWithLengthWritable> output =
+    OutputCollector<LongWritable,ItemPrefWithItemVectorWeightWritable> output =
         EasyMock.createMock(OutputCollector.class);
-    EntityPrefWritableArrayWritable userPrefs =
-        EasyMock.createMock(EntityPrefWritableArrayWritable.class);
-
-    EasyMock.expect(userPrefs.getPrefs()).andReturn(
+    EntityPrefWritableArrayWritable userPrefs = new EntityPrefWritableArrayWritable(
         new EntityPrefWritable[] {
             new EntityPrefWritable(12L, 2.0f),
             new EntityPrefWritable(56L, 3.0f) });
 
-    double length = Math.sqrt(Math.pow(2.0f, 2) + Math.pow(3.0f, 2));
+    double weight =
+      new DistributedUncenteredZeroAssumingCosineSimilarity().weightOfItemVector(Arrays.asList(2.0f, 3.0f).iterator());
+
+    output.collect(new LongWritable(12L), new ItemPrefWithItemVectorWeightWritable(34L, weight, 2.0f));
+    output.collect(new LongWritable(56L), new ItemPrefWithItemVectorWeightWritable(34L, weight, 3.0f));
 
-    output.collect(new LongWritable(12L), new ItemPrefWithLengthWritable(34L, length, 2.0f));
-    output.collect(new LongWritable(56L), new ItemPrefWithLengthWritable(34L, length, 3.0f));
+    JobConf conf = new JobConf();
+    conf.set(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME,
+        "org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity");
 
-    EasyMock.replay(output, userPrefs);
+    EasyMock.replay(output);
 
-    new PreferredItemsPerUserMapper().map(new LongWritable(34L), userPrefs, output, null);
+    PreferredItemsPerUserMapper mapper = new PreferredItemsPerUserMapper();
+    mapper.configure(conf);
+    mapper.map(new LongWritable(34L), userPrefs, output, null);
 
-    EasyMock.verify(output, userPrefs);
+    EasyMock.verify(output);
   }
 
   public void testPreferredItemsPerUserReducer() throws Exception {
 
-    List<ItemPrefWithLengthWritable> itemPrefs =
-        Arrays.asList(new ItemPrefWithLengthWritable(34L, 5.0, 1.0f),
-                      new ItemPrefWithLengthWritable(56L, 7.0, 2.0f));
+    List<ItemPrefWithItemVectorWeightWritable> itemPrefs =
+        Arrays.asList(new ItemPrefWithItemVectorWeightWritable(34L, 5.0, 1.0f),
+                      new ItemPrefWithItemVectorWeightWritable(56L, 7.0, 2.0f));
 
-    OutputCollector<LongWritable,ItemPrefWithLengthArrayWritable> output =
+    OutputCollector<LongWritable,ItemPrefWithItemVectorWeightArrayWritable> output =
         EasyMock.createMock(OutputCollector.class);
 
     output.collect(EasyMock.eq(new LongWritable(12L)), equalToItemPrefs(itemPrefs));
@@ -153,21 +157,21 @@ public class ItemSimilarityTest extends 
     EasyMock.verify(output);
   }
 
-  static ItemPrefWithLengthArrayWritable equalToItemPrefs(
-      final Collection<ItemPrefWithLengthWritable> prefsToCheck) {
+  static ItemPrefWithItemVectorWeightArrayWritable equalToItemPrefs(
+      final Collection<ItemPrefWithItemVectorWeightWritable> prefsToCheck) {
     EasyMock.reportMatcher(new IArgumentMatcher() {
       @Override
       public boolean matches(Object argument) {
-        if (argument instanceof ItemPrefWithLengthArrayWritable) {
-          ItemPrefWithLengthArrayWritable itemPrefArray = (ItemPrefWithLengthArrayWritable) argument;
-          Collection<ItemPrefWithLengthWritable> set = new HashSet<ItemPrefWithLengthWritable>();
+        if (argument instanceof ItemPrefWithItemVectorWeightArrayWritable) {
+          ItemPrefWithItemVectorWeightArrayWritable itemPrefArray = (ItemPrefWithItemVectorWeightArrayWritable) argument;
+          Collection<ItemPrefWithItemVectorWeightWritable> set = new HashSet<ItemPrefWithItemVectorWeightWritable>();
           set.addAll(Arrays.asList(itemPrefArray.getItemPrefs()));
 
           if (set.size() != prefsToCheck.size()) {
             return false;
           }
 
-          for (ItemPrefWithLengthWritable prefToCheck : prefsToCheck) {
+          for (ItemPrefWithItemVectorWeightWritable prefToCheck : prefsToCheck) {
             if (!set.contains(prefToCheck)) {
               return false;
             }
@@ -185,18 +189,18 @@ public class ItemSimilarityTest extends 
   }
 
   public void testCopreferredItemsMapper() throws Exception {
-    OutputCollector<ItemPairWritable,FloatWritable> output =
+    OutputCollector<ItemPairWritable, CoRating> output =
         EasyMock.createMock(OutputCollector.class);
-    ItemPrefWithLengthArrayWritable itemPrefs =
-        EasyMock.createMock(ItemPrefWithLengthArrayWritable.class);
+    ItemPrefWithItemVectorWeightArrayWritable itemPrefs =
+        EasyMock.createMock(ItemPrefWithItemVectorWeightArrayWritable.class);
 
-    EasyMock.expect(itemPrefs.getItemPrefs()).andReturn(new ItemPrefWithLengthWritable[] {
-        new ItemPrefWithLengthWritable(34L, 2.0, 1.0f), new ItemPrefWithLengthWritable(56L, 3.0, 2.0f),
-        new ItemPrefWithLengthWritable(78L, 4.0, 3.0f) });
-
-    output.collect(new ItemPairWritable(34L, 56L, 6.0), new FloatWritable(2.0f));
-    output.collect(new ItemPairWritable(34L, 78L, 8.0), new FloatWritable(3.0f));
-    output.collect(new ItemPairWritable(56L, 78L, 12.0), new FloatWritable(6.0f));
+    EasyMock.expect(itemPrefs.getItemPrefs()).andReturn(new ItemPrefWithItemVectorWeightWritable[] {
+        new ItemPrefWithItemVectorWeightWritable(34L, 2.0, 1.0f), new ItemPrefWithItemVectorWeightWritable(56L, 3.0, 2.0f),
+        new ItemPrefWithItemVectorWeightWritable(78L, 4.0, 3.0f) });
+
+    output.collect(new ItemPairWritable(34L, 56L, 2.0, 3.0), new CoRating(1.0f, 2.0f));
+    output.collect(new ItemPairWritable(34L, 78L, 2.0, 4.0), new CoRating(1.0f, 3.0f));
+    output.collect(new ItemPairWritable(56L, 78L, 3.0, 4.0), new CoRating(2.0f, 3.0f));
 
     EasyMock.replay(output, itemPrefs);
 
@@ -205,17 +209,22 @@ public class ItemSimilarityTest extends 
     EasyMock.verify(output, itemPrefs);
   }
 
-  public void testCosineSimilarityReducer() throws Exception {
+  public void testSimilarityReducer() throws Exception {
     OutputCollector<EntityEntityWritable,DoubleWritable> output =
         EasyMock.createMock(OutputCollector.class);
 
-    output.collect(new EntityEntityWritable(12L, 34L), new DoubleWritable(0.5d));
+    JobConf conf = new JobConf();
+    conf.set(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME,
+        "org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity");
+
+    output.collect(new EntityEntityWritable(12L, 34L), new DoubleWritable(0.5));
 
     EasyMock.replay(output);
 
-    new CosineSimilarityReducer().reduce(new ItemPairWritable(12L, 34L, 20.0),
-        Arrays.asList(new FloatWritable(5.0f),
-                      new FloatWritable(5.0f)).iterator(), output, null);
+    SimilarityReducer reducer = new SimilarityReducer();
+    reducer.configure(conf);
+    reducer.reduce(new ItemPairWritable(12L, 34L, 2.0, 10.0), Arrays.asList(new CoRating(2.5f, 2.0f),
+            new CoRating(2.0f, 2.5f)).iterator(), output, null);
 
     EasyMock.verify(output);
   }
@@ -238,9 +247,9 @@ public class ItemSimilarityTest extends 
       /* user-item-matrix
 
                    Game   Mouse   PC    Disk
-           Jane     0       1      2      0
-           Paul     1       0      1      0
-           Fred     0       0      0      1
+           Jane     -       1      2      -
+           Paul     1       -      1      -
+           Fred     -       -      -      1
        */
 
       BufferedWriter writer = new BufferedWriter(new FileWriter(tmpDirPath+"/prefs.txt"));
@@ -263,7 +272,8 @@ public class ItemSimilarityTest extends 
 
       similarityJob.setConf(conf);
 
-      similarityJob.run(new String[] { "--tempDir", tmpDirPath+"/tmp"});
+      similarityJob.run(new String[] { "--tempDir", tmpDirPath+"/tmp", "--similarityClassname",
+          "org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity"});
 
       String filePath = tmpDirPath+"/output/part-00000";
       BufferedReader reader = new BufferedReader(new FileReader(filePath));