You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/04/23 11:34:58 UTC
svn commit: r937210 - in
/lucene/mahout/trunk/core/src/main/java/org/apache/mahout: cf/taste/hadoop/
cf/taste/hadoop/item/ cf/taste/hadoop/similarity/item/ common/ math/
Author: srowen
Date: Fri Apr 23 09:34:57 2010
New Revision: 937210
URL: http://svn.apache.org/viewvc?rev=937210&view=rev
Log:
Overhauled implementation entirely, now distributes the matrix multiply more
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java
- copied, changed from r936233, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
- copied, changed from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
- copied, changed from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
Removed:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java (from r936233, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java&r1=936233&r2=937210&rev=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java Fri Apr 23 09:34:57 2010
@@ -23,69 +23,73 @@ import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.mahout.common.RandomUtils;
+import org.apache.hadoop.io.WritableUtils;
-/** A {@link Writable} encapsulating an item ID and a preference value. */
-public final class EntityPrefWritable extends LongWritable {
-
- private float prefValue;
-
- public EntityPrefWritable() {
+/** A {@link Writable} encapsulating an item ID and a count . */
+public final class EntityCountWritable extends LongWritable implements Cloneable {
+
+ private int count;
+
+ public EntityCountWritable() {
// do nothing
}
-
- public EntityPrefWritable(long itemID, float prefValue) {
+
+ public EntityCountWritable(long itemID, int count) {
super(itemID);
- this.prefValue = prefValue;
+ this.count = count;
}
-
- public EntityPrefWritable(EntityPrefWritable other) {
- this(other.get(), other.getPrefValue());
+
+ public EntityCountWritable(EntityCountWritable other) {
+ this(other.get(), other.getCount());
}
public long getID() {
return get();
}
- public float getPrefValue() {
- return prefValue;
+ public int getCount() {
+ return count;
}
-
+
+ public void set(long id, int count) {
+ super.set(id);
+ this.count = count;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeFloat(prefValue);
+ WritableUtils.writeVInt(out, count);
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- prefValue = in.readFloat();
- }
-
- public static EntityPrefWritable read(DataInput in) throws IOException {
- EntityPrefWritable writable = new EntityPrefWritable();
- writable.readFields(in);
- return writable;
+ count = WritableUtils.readVInt(in);
}
@Override
public int hashCode() {
- return super.hashCode() ^ RandomUtils.hashFloat(prefValue);
+ return super.hashCode() ^ count;
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof EntityPrefWritable)) {
+ if (!(o instanceof EntityCountWritable)) {
return false;
}
- EntityPrefWritable other = (EntityPrefWritable) o;
- return get() == other.get() && prefValue == other.getPrefValue();
+ EntityCountWritable other = (EntityCountWritable) o;
+ return get() == other.get() && count == other.getCount();
+ }
+
+ @Override
+ public String toString() {
+ return get() + "\t" + count;
}
@Override
- public EntityPrefWritable clone() {
- return new EntityPrefWritable(get(), prefValue);
+ public EntityCountWritable clone() {
+ return new EntityCountWritable(get(), count);
}
-
+
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java Fri Apr 23 09:34:57 2010
@@ -25,7 +25,8 @@ import org.apache.hadoop.io.WritableComp
import org.apache.mahout.common.RandomUtils;
/** A {@link WritableComparable} encapsulating two items. */
-public final class EntityEntityWritable implements WritableComparable<EntityEntityWritable> {
+public final class EntityEntityWritable
+ implements WritableComparable<EntityEntityWritable>, Cloneable {
private long aID;
private long bID;
@@ -46,6 +47,11 @@ public final class EntityEntityWritable
public long getBID() {
return bID;
}
+
+ public void set(long aID, long bID) {
+ this.aID = aID;
+ this.bID = bID;
+ }
@Override
public void write(DataOutput out) throws IOException {
@@ -59,12 +65,6 @@ public final class EntityEntityWritable
bID = in.readLong();
}
- public static EntityEntityWritable read(DataInput in) throws IOException {
- EntityEntityWritable writable = new EntityEntityWritable();
- writable.readFields(in);
- return writable;
- }
-
@Override
public int compareTo(EntityEntityWritable that) {
int aCompare = compare(aID, that.getAID());
@@ -93,5 +93,10 @@ public final class EntityEntityWritable
public String toString() {
return aID + "\t" + bID;
}
+
+ @Override
+ public EntityEntityWritable clone() {
+ return new EntityEntityWritable(aID, bID);
+ }
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java Fri Apr 23 09:34:57 2010
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.mahout.common.RandomUtils;
/** A {@link Writable} encapsulating an item ID and a preference value. */
-public final class EntityPrefWritable extends LongWritable {
+public final class EntityPrefWritable extends LongWritable implements Cloneable {
private float prefValue;
@@ -50,6 +50,11 @@ public final class EntityPrefWritable ex
public float getPrefValue() {
return prefValue;
}
+
+ public void set(long id, float prefValue) {
+ super.set(id);
+ this.prefValue = prefValue;
+ }
@Override
public void write(DataOutput out) throws IOException {
@@ -62,12 +67,6 @@ public final class EntityPrefWritable ex
super.readFields(in);
prefValue = in.readFloat();
}
-
- public static EntityPrefWritable read(DataInput in) throws IOException {
- EntityPrefWritable writable = new EntityPrefWritable();
- writable.readFields(in);
- return writable;
- }
@Override
public int hashCode() {
@@ -84,6 +83,11 @@ public final class EntityPrefWritable ex
}
@Override
+ public String toString() {
+ return get() + "\t" + prefValue;
+ }
+
+ @Override
public EntityPrefWritable clone() {
return new EntityPrefWritable(get(), prefValue);
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Fri Apr 23 09:34:57 2010
@@ -47,6 +47,10 @@ public final class RecommendedItemsWrita
public List<RecommendedItem> getRecommendedItems() {
return recommended;
}
+
+ public void set(List<RecommendedItem> recommended) {
+ this.recommended = recommended;
+ }
@Override
public void write(DataOutput out) throws IOException {
@@ -70,12 +74,6 @@ public final class RecommendedItemsWrita
}
}
- public static RecommendedItemsWritable read(DataInput in) throws IOException {
- RecommendedItemsWritable writable = new RecommendedItemsWritable();
- writable.readFields(in);
- return writable;
- }
-
@Override
public String toString() {
StringBuilder result = new StringBuilder(200);
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.MapFilesMap;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
+import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
+
+public final class AggregateAndRecommendReducer extends MapReduceBase implements
+ Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RecommendedItemsWritable> {
+
+ static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
+ static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
+
+ private int recommendationsPerUser;
+ private MapFilesMap<IntWritable,LongWritable> indexItemIDMap;
+
+ @Override
+ public void configure(JobConf jobConf) {
+ try {
+ FileSystem fs = FileSystem.get(jobConf);
+ Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
+ recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
+ indexItemIDMap = new MapFilesMap<IntWritable,LongWritable>(fs, itemIDIndexPath, new Configuration());
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+ @Override
+ public void reduce(LongWritable key,
+ Iterator<RandomAccessSparseVectorWritable> values,
+ OutputCollector<LongWritable, RecommendedItemsWritable> output,
+ Reporter reporter) throws IOException {
+ if (!values.hasNext()) {
+ return;
+ }
+ Vector recommendationVector = values.next().get();
+ while (values.hasNext()) {
+ recommendationVector = recommendationVector.plus(values.next().get());
+ }
+
+ Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
+ Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));
+
+ Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
+ LongWritable itemID = new LongWritable();
+ while (recommendationVectorIterator.hasNext()) {
+ Vector.Element element = recommendationVectorIterator.next();
+ int index = element.index();
+ if (topItems.size() < recommendationsPerUser) {
+ LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
+ if (theItemID != null) {
+ topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
+ } // else, huh?
+ } else if (element.get() > topItems.peek().getValue()) {
+ LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
+ if (theItemID != null) {
+ topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
+ topItems.poll();
+ } // else, huh?
+ }
+ }
+
+ if (!topItems.isEmpty()) {
+ List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
+ recommendations.addAll(topItems);
+ Collections.sort(recommendations, ByValueRecommendedItemComparator.getInstance());
+ output.collect(key, new RecommendedItemsWritable(recommendations));
+ }
+ }
+
+}
\ No newline at end of file
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
+
+public final class AggregateCombiner extends MapReduceBase implements
+ Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RandomAccessSparseVectorWritable> {
+
+ @Override
+ public void reduce(LongWritable key,
+ Iterator<RandomAccessSparseVectorWritable> values,
+ OutputCollector<LongWritable, RandomAccessSparseVectorWritable> output,
+ Reporter reporter) throws IOException {
+ if (!values.hasNext()) {
+ return;
+ }
+ Vector partial = values.next().get();
+ while (values.hasNext()) {
+ partial = partial.plus(values.next().get());
+ }
+ output.collect(key, new RandomAccessSparseVectorWritable((RandomAccessSparseVector) partial));
+ }
+
+}
\ No newline at end of file
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+
+public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
+ Mapper<IntWritable, RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+
+ @Override
+ public void map(IntWritable key,
+ RandomAccessSparseVectorWritable value,
+ OutputCollector<IntWritable,VectorOrPrefWritable> output,
+ Reporter reporter) throws IOException {
+ output.collect(key, new VectorOrPrefWritable((RandomAccessSparseVector) value.get()));
+ }
+
+}
\ No newline at end of file
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
+import org.apache.mahout.math.function.IntIntProcedure;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+public final class CooccurrenceCombiner extends MapReduceBase implements
+ Reducer<IntWritable,EntityCountWritable,IntWritable,EntityCountWritable> {
+
+ @Override
+ public void reduce(final IntWritable index1,
+ Iterator<EntityCountWritable> index2s,
+ final OutputCollector<IntWritable,EntityCountWritable> output,
+ Reporter reporter) {
+
+ OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+ while (index2s.hasNext()) {
+ EntityCountWritable writable = index2s.next();
+ int index = (int) writable.getID();
+ int oldValue = indexCounts.get(index);
+ indexCounts.put(index, oldValue + writable.getCount());
+ }
+
+ final EntityCountWritable entityCount = new EntityCountWritable();
+ indexCounts.forEachPair(new IntIntProcedure() {
+ @Override
+ public boolean apply(int index, int count) {
+ entityCount.set(index, count);
+ try {
+ output.collect(index1, entityCount);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ return true;
+ }
+ });
+ }
+}
\ No newline at end of file
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java (from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java&r1=936184&r2=937210&rev=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java Fri Apr 23 09:34:57 2010
@@ -19,107 +19,72 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.LongFloatProcedure;
+import org.apache.mahout.math.map.OpenLongFloatHashMap;
-/**
- * <h1>Input</h1>
- *
- * <p>
- * Takes user IDs as {@link LongWritable} mapped to all associated item IDs and preference values, as
- * {@link EntityPrefWritable}s.
- * </p>
- *
- * <h1>Output</h1>
- *
- * <p>
- * The same user ID mapped to a {@link RandomAccessSparseVector} representation of the same item IDs and
- * preference values. Item IDs are used as vector indexes; they are hashed into ints to work as indexes with
- * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for later with a combination of
- * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
- * </p>
- *
- * <p>
- * The number of non-default elements actually retained in the user vector is capped at
- * {@link #MAX_PREFS_CONSIDERED}.
- * </p>
- *
- */
-public final class ToUserVectorReducer extends MapReduceBase implements
- Reducer<LongWritable,LongWritable,LongWritable,VectorWritable> {
-
- public static final int MAX_PREFS_CONSIDERED = 20;
-
- private boolean booleanData;
+public final class PartialMultiplyReducer extends MapReduceBase implements
+ Reducer<IntWritable,VectorOrPrefWritable,LongWritable, RandomAccessSparseVectorWritable> {
@Override
- public void configure(JobConf jobConf) {
- booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
- }
-
- @Override
- public void reduce(LongWritable userID,
- Iterator<LongWritable> itemPrefs,
- OutputCollector<LongWritable,VectorWritable> output,
+ public void reduce(IntWritable key,
+ Iterator<VectorOrPrefWritable> values,
+ final OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
Reporter reporter) throws IOException {
- if (itemPrefs.hasNext()) {
- RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- while (itemPrefs.hasNext()) {
- LongWritable itemPref = itemPrefs.next();
- int index = ItemIDIndexMapper.idToIndex(itemPref.get());
- float value;
- if (itemPref instanceof EntityPrefWritable) {
- value = ((EntityPrefWritable) itemPref).getPrefValue();
+ OpenLongFloatHashMap savedValues = new OpenLongFloatHashMap();
+ Vector cooccurrenceColumn = null;
+ final int itemIndex = key.get();
+ final LongWritable userIDWritable = new LongWritable();
+ final RandomAccessSparseVectorWritable vectorWritable = new RandomAccessSparseVectorWritable();
+ while (values.hasNext()) {
+ VectorOrPrefWritable value = values.next();
+ if (value.getVector() == null) {
+ // Then this is a user-pref value
+ long userID = value.getUserID();
+ float preferenceValue = value.getValue();
+ if (cooccurrenceColumn == null) {
+ // Haven't seen the co-occurrencce column yet; save it
+ savedValues.put(userID, preferenceValue);
} else {
- value = 1.0f;
+ // Have seen it
+ Vector partialProduct = cooccurrenceColumn.times(preferenceValue);
+ // This makes sure this item isn't recommended for this user:
+ partialProduct.set(itemIndex, Double.NEGATIVE_INFINITY);
+ userIDWritable.set(userID);
+ vectorWritable.set(partialProduct);
+ output.collect(userIDWritable, vectorWritable);
}
- userVector.set(index, value);
- }
-
- if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
- double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
- userVector);
- RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
- MAX_PREFS_CONSIDERED);
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- while (it.hasNext()) {
- Vector.Element element = it.next();
- if (element.get() >= cutoff) {
- filteredVector.set(element.index(), element.get());
+ } else {
+ cooccurrenceColumn = value.getVector();
+ final Vector theColumn = cooccurrenceColumn;
+ savedValues.forEachPair(new LongFloatProcedure() {
+ @Override
+ public boolean apply(long userID, float value) {
+ Vector partialProduct = theColumn.times(value);
+ // This makes sure this item isn't recommended for this user:
+ partialProduct.set(itemIndex, Double.NEGATIVE_INFINITY);
+ userIDWritable.set(userID);
+ vectorWritable.set(partialProduct);
+ try {
+ output.collect(userIDWritable, vectorWritable);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ return true;
}
- }
- userVector = filteredVector;
- }
-
- VectorWritable writable = new VectorWritable(userVector);
- output.collect(userID, writable);
- }
- }
-
- private static double findTopNPrefsCutoff(int n, Vector userVector) {
- Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- while (it.hasNext()) {
- double prefValue = it.next().get();
- if (topPrefValues.size() < n) {
- topPrefValues.add(prefValue);
- } else if (prefValue > topPrefValues.peek()) {
- topPrefValues.add(prefValue);
- topPrefValues.poll();
+ });
+ savedValues.clear();
}
}
- return topPrefValues.peek();
+
}
-
-}
+
+}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Fri Apr 23 09:34:57 2010
@@ -22,6 +22,8 @@ import java.util.Map;
import org.apache.commons.cli2.Option;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -29,17 +31,20 @@ import org.apache.hadoop.io.compress.Gzi
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
/**
* <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
@@ -91,38 +96,62 @@ public final class RecommenderJob extend
String userVectorPath = tempDirPath + "/userVectors";
String itemIDIndexPath = tempDirPath + "/itemIDIndex";
String cooccurrencePath = tempDirPath + "/cooccurrence";
-
- JobConf itemIDIndexConf = prepareJobConf(inputPath, itemIDIndexPath,
- TextInputFormat.class, ItemIDIndexMapper.class, IntWritable.class, LongWritable.class,
- ItemIDIndexReducer.class, IntWritable.class, LongWritable.class, MapFileOutputFormat.class);
+ String parialMultiplyPath = tempDirPath + "/partialMultiply";
+
+ JobConf itemIDIndexConf = prepareJobConf(
+ inputPath, itemIDIndexPath, TextInputFormat.class,
+ ItemIDIndexMapper.class, IntWritable.class, LongWritable.class,
+ ItemIDIndexReducer.class, IntWritable.class, LongWritable.class,
+ MapFileOutputFormat.class);
JobClient.runJob(itemIDIndexConf);
- JobConf toUserVectorConf = prepareJobConf(inputPath, userVectorPath,
- TextInputFormat.class, ToItemPrefsMapper.class, LongWritable.class,
- booleanData ? LongWritable.class : EntityPrefWritable.class,
- ToUserVectorReducer.class, LongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
+ JobConf toUserVectorConf = prepareJobConf(
+ inputPath, userVectorPath, TextInputFormat.class,
+ ToItemPrefsMapper.class, LongWritable.class, booleanData ? LongWritable.class : EntityPrefWritable.class,
+ ToUserVectorReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+ SequenceFileOutputFormat.class);
toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
JobClient.runJob(toUserVectorConf);
-
- JobConf toCooccurrenceConf = prepareJobConf(userVectorPath, cooccurrencePath,
- SequenceFileInputFormat.class, UserVectorToCooccurrenceMapper.class, IntWritable.class,
- IntWritable.class, UserVectorToCooccurrenceReducer.class, IntWritable.class, VectorWritable.class,
- MapFileOutputFormat.class);
+
+ JobConf toCooccurrenceConf = prepareJobConf(
+ userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
+ UserVectorToCooccurrenceMapper.class, IntWritable.class, EntityCountWritable.class,
+ UserVectorToCooccurrenceReducer.class, IntWritable.class, RandomAccessSparseVectorWritable.class,
+ SequenceFileOutputFormat.class);
+ toCooccurrenceConf.setInt("io.sort.mb", 600);
+ toCooccurrenceConf.setClass("mapred.combiner.class", CooccurrenceCombiner.class, Reducer.class);
JobClient.runJob(toCooccurrenceConf);
-
- JobConf recommenderConf = prepareJobConf(userVectorPath, outputPath,
- SequenceFileInputFormat.class, RecommenderMapper.class, LongWritable.class,
- RecommendedItemsWritable.class, IdentityReducer.class, LongWritable.class,
- RecommendedItemsWritable.class, TextOutputFormat.class);
- recommenderConf.setBoolean(BOOLEAN_DATA, booleanData);
- recommenderConf.set(RecommenderMapper.COOCCURRENCE_PATH, cooccurrencePath);
- recommenderConf.set(RecommenderMapper.ITEMID_INDEX_PATH, itemIDIndexPath);
- recommenderConf.setInt(RecommenderMapper.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+
+ JobConf partialMultiplyConf = prepareJobConf(
+ cooccurrencePath, parialMultiplyPath, SequenceFileInputFormat.class,
+ CooccurrenceColumnWrapperMapper.class, IntWritable.class, VectorOrPrefWritable.class,
+ PartialMultiplyReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+ SequenceFileOutputFormat.class);
+ MultipleInputs.addInputPath(
+ partialMultiplyConf,
+ new Path(cooccurrencePath).makeQualified(FileSystem.get(partialMultiplyConf)),
+ SequenceFileInputFormat.class, CooccurrenceColumnWrapperMapper.class);
+ MultipleInputs.addInputPath(
+ partialMultiplyConf,
+ new Path(userVectorPath).makeQualified(FileSystem.get(partialMultiplyConf)),
+ SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
if (usersFile != null) {
- recommenderConf.set(RecommenderMapper.USERS_FILE, usersFile);
+ partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
- recommenderConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
- JobClient.runJob(recommenderConf);
+ JobClient.runJob(partialMultiplyConf);
+
+ JobConf aggregateAndRecommendConf = prepareJobConf(
+ parialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+ IdentityMapper.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+ AggregateAndRecommendReducer.class, LongWritable.class, RecommendedItemsWritable.class,
+ TextOutputFormat.class);
+ aggregateAndRecommendConf.setInt("io.sort.mb", 600);
+ aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
+ aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath);
+ aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+ aggregateAndRecommendConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
+ JobClient.runJob(aggregateAndRecommendConf);
+
return 0;
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Fri Apr 23 09:34:57 2010
@@ -30,8 +30,8 @@ import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
/**
* <h1>Input</h1>
@@ -57,7 +57,7 @@ import org.apache.mahout.math.VectorWrit
*
*/
public final class ToUserVectorReducer extends MapReduceBase implements
- Reducer<LongWritable,LongWritable,LongWritable,VectorWritable> {
+ Reducer<LongWritable,LongWritable,LongWritable, RandomAccessSparseVectorWritable> {
public static final int MAX_PREFS_CONSIDERED = 20;
@@ -71,42 +71,43 @@ public final class ToUserVectorReducer e
@Override
public void reduce(LongWritable userID,
Iterator<LongWritable> itemPrefs,
- OutputCollector<LongWritable,VectorWritable> output,
+ OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
Reporter reporter) throws IOException {
- if (itemPrefs.hasNext()) {
- RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- while (itemPrefs.hasNext()) {
- LongWritable itemPref = itemPrefs.next();
- int index = ItemIDIndexMapper.idToIndex(itemPref.get());
- float value;
- if (itemPref instanceof EntityPrefWritable) {
- value = ((EntityPrefWritable) itemPref).getPrefValue();
- } else {
- value = 1.0f;
- }
- userVector.set(index, value);
+ if (!itemPrefs.hasNext()) {
+ return;
+ }
+ RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ while (itemPrefs.hasNext()) {
+ LongWritable itemPref = itemPrefs.next();
+ int index = ItemIDIndexMapper.idToIndex(itemPref.get());
+ float value;
+ if (itemPref instanceof EntityPrefWritable) {
+ value = ((EntityPrefWritable) itemPref).getPrefValue();
+ } else {
+ value = 1.0f;
}
-
- if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
- double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
- userVector);
- RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
- MAX_PREFS_CONSIDERED);
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- while (it.hasNext()) {
- Vector.Element element = it.next();
- if (element.get() >= cutoff) {
- filteredVector.set(element.index(), element.get());
- }
+ userVector.set(index, value);
+ }
+
+ if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
+ double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
+ userVector);
+ RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
+ MAX_PREFS_CONSIDERED);
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ Vector.Element element = it.next();
+ if (element.get() >= cutoff) {
+ filteredVector.set(element.index(), element.get());
}
- userVector = filteredVector;
}
-
- VectorWritable writable = new VectorWritable(userVector);
- output.collect(userID, writable);
+ userVector = filteredVector;
}
+
+ RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable(userVector);
+ output.collect(userID, writable);
}
-
+
private static double findTopNPrefsCutoff(int n, Vector userVector) {
Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
Iterator<Vector.Element> it = userVector.iterateNonZero();
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java&r1=936184&r2=937210&rev=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Fri Apr 23 09:34:57 2010
@@ -18,34 +18,71 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.regex.Pattern;
+import java.util.Iterator;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
-public final class ItemIDIndexMapper extends MapReduceBase implements
- Mapper<LongWritable,Text,IntWritable,LongWritable> {
-
- private static final Pattern COMMA = Pattern.compile(",");
+public final class UserVectorSplitterMapper extends MapReduceBase implements
+ Mapper<LongWritable,RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+
+ static final String USERS_FILE = "usersFile";
+ private FastIDSet usersToRecommendFor;
+
+ @Override
+ public void configure(JobConf jobConf) {
+ try {
+ FileSystem fs = FileSystem.get(jobConf);
+ String usersFilePathString = jobConf.get(USERS_FILE);
+ if (usersFilePathString == null) {
+ usersToRecommendFor = null;
+ } else {
+ usersToRecommendFor = new FastIDSet();
+ Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
+ FSDataInputStream in = fs.open(usersFilePath);
+ for (String line : new FileLineIterable(in)) {
+ usersToRecommendFor.add(Long.parseLong(line));
+ }
+ }
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
@Override
public void map(LongWritable key,
- Text value,
- OutputCollector<IntWritable,LongWritable> output,
+ RandomAccessSparseVectorWritable value,
+ OutputCollector<IntWritable,VectorOrPrefWritable> output,
Reporter reporter) throws IOException {
- String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
- long itemID = Long.parseLong(tokens[1]);
- int index = idToIndex(itemID);
- output.collect(new IntWritable(index), new LongWritable(itemID));
- }
-
- static int idToIndex(long itemID) {
- return 0x7FFFFFFF & ((int) itemID ^ (int) (itemID >>> 32));
+ long userID = key.get();
+ if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
+ return;
+ }
+ Vector userVector = value.get();
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ IntWritable itemIndexWritable = new IntWritable();
+ VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
+ while (it.hasNext()) {
+ Vector.Element e = it.next();
+ int itemIndex = e.index();
+ double preferenceValue = e.get();
+ itemIndexWritable.set(itemIndex);
+ vectorOrPref.set(userID, (float) preferenceValue);
+ output.collect(itemIndexWritable, vectorOrPref);
+ }
}
-
+
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Fri Apr 23 09:34:57 2010
@@ -26,29 +26,29 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
- Mapper<LongWritable,VectorWritable,IntWritable,IntWritable> {
+ Mapper<LongWritable, RandomAccessSparseVectorWritable,IntWritable, EntityCountWritable> {
@Override
public void map(LongWritable userID,
- VectorWritable userVector,
- OutputCollector<IntWritable,IntWritable> output,
+ RandomAccessSparseVectorWritable userVector,
+ OutputCollector<IntWritable,EntityCountWritable> output,
Reporter reporter) throws IOException {
Iterator<Vector.Element> it = userVector.get().iterateNonZero();
+ EntityCountWritable entityCount = new EntityCountWritable();
+ IntWritable writable1 = new IntWritable();
while (it.hasNext()) {
- Vector.Element next1 = it.next();
- int index1 = next1.index();
+ int index1 = it.next().index();
+ writable1.set(index1);
Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
- IntWritable itemWritable1 = new IntWritable(index1);
while (it2.hasNext()) {
- Vector.Element next2 = it2.next();
- int index2 = next2.index();
- if (index1 != index2) {
- output.collect(itemWritable1, new IntWritable(index2));
- }
+ int index2 = it2.next().index();
+ entityCount.set(index2, 1);
+ output.collect(writable1, entityCount);
}
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Fri Apr 23 09:34:57 2010
@@ -25,34 +25,39 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.function.IntIntProcedure;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
- Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> {
+ Reducer<IntWritable, EntityCountWritable,IntWritable, RandomAccessSparseVectorWritable> {
@Override
public void reduce(IntWritable index1,
- Iterator<IntWritable> index2s,
- OutputCollector<IntWritable,VectorWritable> output,
+ Iterator<EntityCountWritable> index2s,
+ OutputCollector<IntWritable,RandomAccessSparseVectorWritable> output,
Reporter reporter) throws IOException {
- if (index2s.hasNext()) {
- RandomAccessSparseVector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 1000);
- while (index2s.hasNext()) {
- int index2 = index2s.next().get();
- cooccurrenceRow.set(index2, cooccurrenceRow.get(index2) + 1.0);
- }
- Iterator<Vector.Element> cooccurrences = cooccurrenceRow.iterateNonZero();
- while (cooccurrences.hasNext()) {
- Vector.Element element = cooccurrences.next();
- if (element.get() <= 1.0) { // purge small values
- element.set(0.0);
- }
- }
- VectorWritable writable = new VectorWritable(cooccurrenceRow);
- output.collect(index1, writable);
+
+ OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+ while (index2s.hasNext()) {
+ EntityCountWritable writable = index2s.next();
+ int index = (int) writable.getID();
+ int oldValue = indexCounts.get(index);
+ indexCounts.put(index, oldValue + writable.getCount());
}
+
+ final RandomAccessSparseVector cooccurrenceRow =
+ new RandomAccessSparseVector(Integer.MAX_VALUE, 1000);
+ indexCounts.forEachPair(new IntIntProcedure() {
+ @Override
+ public boolean apply(int index, int count) {
+ cooccurrenceRow.set(index, count);
+ return true;
+ }
+ });
+ output.collect(index1, new RandomAccessSparseVectorWritable(cooccurrenceRow));
}
}
\ No newline at end of file
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
+
+public final class VectorOrPrefWritable implements Writable {
+
+ private RandomAccessSparseVector vector;
+ private long userID;
+ private float value;
+
+ public VectorOrPrefWritable() {
+ }
+
+ public VectorOrPrefWritable(RandomAccessSparseVector vector) {
+ this.vector = vector;
+ }
+
+ public VectorOrPrefWritable(long userID, float value) {
+ this.userID = userID;
+ this.value = value;
+ }
+
+ public Vector getVector() {
+ return vector;
+ }
+
+ public long getUserID() {
+ return userID;
+ }
+
+ public float getValue() {
+ return value;
+ }
+
+ public void set(RandomAccessSparseVector vector) {
+ this.vector = vector;
+ this.userID = Long.MIN_VALUE;
+ this.value = Float.NaN;
+ }
+
+ public void set(long userID, float value) {
+ this.vector = null;
+ this.userID = userID;
+ this.value = value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (vector == null) {
+ out.writeBoolean(false);
+ out.writeLong(userID);
+ out.writeFloat(value);
+ } else {
+ out.writeBoolean(true);
+ new RandomAccessSparseVectorWritable(vector).write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ boolean hasVector = in.readBoolean();
+ if (hasVector) {
+ RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable();
+ writable.readFields(in);
+ set((RandomAccessSparseVector) writable.get());
+ } else {
+ long theUserID = in.readLong();
+ float theValue = in.readFloat();
+ set(theUserID, theValue);
+ }
+ }
+
+}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java Fri Apr 23 09:34:57 2010
@@ -59,7 +59,8 @@ public final class ItemPairWritable impl
@Override
public void readFields(DataInput in) throws IOException {
- itemItemWritable = EntityEntityWritable.read(in);
+ itemItemWritable = new EntityEntityWritable();
+ itemItemWritable.readFields(in);
multipliedLength = in.readDouble();
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java Fri Apr 23 09:34:57 2010
@@ -64,7 +64,8 @@ public final class ItemPrefWithLengthWri
@Override
public void readFields(DataInput in) throws IOException {
- itemPref = EntityPrefWritable.read(in);
+ itemPref = new EntityPrefWritable();
+ itemPref.readFields(in);
length = in.readDouble();
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Fri Apr 23 09:34:57 2010
@@ -176,10 +176,13 @@ public abstract class AbstractJob extend
jobConf.setBoolean("mapred.output.compress", true);
// otherwise leave it to its default value
}
+ jobConf.setCompressMapOutput(true);
+
String customJobName = jobConf.get("mapred.job.name");
- if (customJobName != null) {
- jobConf.set("mapred.job.name", customJobName + '-' + mapper.getSimpleName() + '-' + reducer.getSimpleName());
+ if (customJobName == null) {
+ customJobName = getClass().getSimpleName();
}
+ jobConf.set("mapred.job.name", customJobName + '-' + mapper.getSimpleName() + '-' + reducer.getSimpleName());
jobConf.setClass("mapred.output.format.class", outputFormat, OutputFormat.class);
// Override this:
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java Fri Apr 23 09:34:57 2010
@@ -40,7 +40,7 @@ public class VectorWritable extends Conf
return vector;
}
- protected void set(Vector vector) {
+ public void set(Vector vector) {
this.vector = vector;
}