You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/04/28 06:37:20 UTC
svn commit: r938782 - in
/lucene/mahout/trunk/core/src/main/java/org/apache/mahout: cf/taste/hadoop/
cf/taste/hadoop/cooccurence/ cf/taste/hadoop/item/ common/
Author: srowen
Date: Wed Apr 28 04:37:19 2010
New Revision: 938782
URL: http://svn.apache.org/viewvc?rev=938782&view=rev
Log:
(More of MAHOUT-385 and) Another round of refinements to item-based recommender; per MAHOUT-305 combine the two implementations
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
- copied, changed from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
- copied, changed from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java
Removed:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Wed Apr 28 04:37:19 2010
@@ -25,40 +25,60 @@ import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.MapFilesMap;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
public final class AggregateAndRecommendReducer extends MapReduceBase implements
- Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RecommendedItemsWritable> {
+ Reducer<LongWritable,VectorWritable,LongWritable,RecommendedItemsWritable> {
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
+ private static final PathFilter PARTS_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part-");
+ }
+ };
+
private int recommendationsPerUser;
- private MapFilesMap<IntWritable,LongWritable> indexItemIDMap;
+ private OpenIntLongHashMap indexItemIDMap;
@Override
public void configure(JobConf jobConf) {
+ recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
try {
FileSystem fs = FileSystem.get(jobConf);
Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
- recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
- indexItemIDMap = new MapFilesMap<IntWritable,LongWritable>(fs, itemIDIndexPath, new Configuration());
+ indexItemIDMap = new OpenIntLongHashMap();
+ IntWritable index = new IntWritable();
+ LongWritable id = new LongWritable();
+ for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
+ String path = status.getPath().toString();
+ SequenceFile.Reader reader =
+ new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), jobConf);
+ while (reader.next(index, id)) {
+ indexItemIDMap.put(index.get(), id.get());
+ }
+ reader.close();
+ }
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
@@ -66,7 +86,7 @@ public final class AggregateAndRecommend
@Override
public void reduce(LongWritable key,
- Iterator<RandomAccessSparseVectorWritable> values,
+ Iterator<VectorWritable> values,
OutputCollector<LongWritable, RecommendedItemsWritable> output,
Reporter reporter) throws IOException {
if (!values.hasNext()) {
@@ -80,22 +100,18 @@ public final class AggregateAndRecommend
Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));
- Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
- LongWritable itemID = new LongWritable();
+ Iterator<Vector.Element> recommendationVectorIterator =
+ recommendationVector.iterateNonZero();
while (recommendationVectorIterator.hasNext()) {
Vector.Element element = recommendationVectorIterator.next();
int index = element.index();
if (topItems.size() < recommendationsPerUser) {
- LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
- if (theItemID != null) {
- topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
- } // else, huh?
+ long theItemID = indexItemIDMap.get(index);
+ topItems.add(new GenericRecommendedItem(theItemID, (float) element.get()));
} else if (element.get() > topItems.peek().getValue()) {
- LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
- if (theItemID != null) {
- topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
- topItems.poll();
- } // else, huh?
+ long theItemID = indexItemIDMap.get(index);
+ topItems.add(new GenericRecommendedItem(theItemID, (float) element.get()));
+ topItems.poll();
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Wed Apr 28 04:37:19 2010
@@ -25,17 +25,16 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
public final class AggregateCombiner extends MapReduceBase implements
- Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RandomAccessSparseVectorWritable> {
+ Reducer<LongWritable,VectorWritable,LongWritable,VectorWritable> {
@Override
public void reduce(LongWritable key,
- Iterator<RandomAccessSparseVectorWritable> values,
- OutputCollector<LongWritable, RandomAccessSparseVectorWritable> output,
+ Iterator<VectorWritable> values,
+ OutputCollector<LongWritable, VectorWritable> output,
Reporter reporter) throws IOException {
if (!values.hasNext()) {
return;
@@ -44,7 +43,7 @@ public final class AggregateCombiner ext
while (values.hasNext()) {
partial = partial.plus(values.next().get());
}
- output.collect(key, new RandomAccessSparseVectorWritable((RandomAccessSparseVector) partial));
+ output.collect(key, new VectorWritable(partial));
}
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Wed Apr 28 04:37:19 2010
@@ -24,18 +24,17 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
- Mapper<IntWritable, RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+ Mapper<IntWritable, VectorWritable,IntWritable,VectorOrPrefWritable> {
@Override
public void map(IntWritable key,
- RandomAccessSparseVectorWritable value,
+ VectorWritable value,
OutputCollector<IntWritable,VectorOrPrefWritable> output,
Reporter reporter) throws IOException {
- output.collect(key, new VectorOrPrefWritable((RandomAccessSparseVector) value.get()));
+ output.collect(key, new VectorOrPrefWritable(value.get()));
}
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java Wed Apr 28 04:37:19 2010
@@ -25,39 +25,36 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
-import org.apache.mahout.math.function.IntIntProcedure;
-import org.apache.mahout.math.map.OpenIntIntHashMap;
public final class CooccurrenceCombiner extends MapReduceBase implements
- Reducer<IntWritable,EntityCountWritable,IntWritable,EntityCountWritable> {
+ Reducer<IndexIndexWritable,IntWritable,IndexIndexWritable,IntWritable> {
- @Override
- public void reduce(final IntWritable index1,
- Iterator<EntityCountWritable> index2s,
- final OutputCollector<IntWritable,EntityCountWritable> output,
- Reporter reporter) {
+ private IndexIndexWritable lastEntityEntity =
+ new IndexIndexWritable(Integer.MIN_VALUE, Integer.MIN_VALUE);
+ private int count = 0;
- OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
- while (index2s.hasNext()) {
- EntityCountWritable writable = index2s.next();
- int index = (int) writable.getID();
- int oldValue = indexCounts.get(index);
- indexCounts.put(index, oldValue + writable.getCount());
+ @Override
+ public void reduce(IndexIndexWritable entityEntity,
+ Iterator<IntWritable> counts,
+ OutputCollector<IndexIndexWritable,IntWritable> output,
+ Reporter reporter) throws IOException {
+ if (entityEntity.equals(lastEntityEntity)) {
+ count += sum(counts);
+ } else {
+ if (count > 0) {
+ output.collect(lastEntityEntity, new IntWritable(count));
+ }
+ count = sum(counts);
+ lastEntityEntity = entityEntity.clone();
}
+ }
- final EntityCountWritable entityCount = new EntityCountWritable();
- indexCounts.forEachPair(new IntIntProcedure() {
- @Override
- public boolean apply(int index, int count) {
- entityCount.set(index, count);
- try {
- output.collect(index1, entityCount);
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
- return true;
- }
- });
+ static int sum(Iterator<IntWritable> it) {
+ int sum = 0;
+ while (it.hasNext()) {
+ sum += it.next().get();
+ }
+ return sum;
}
+
}
\ No newline at end of file
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java?rev=938782&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java Wed Apr 28 04:37:19 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public final class FirstIndexPartitioner<V> implements Partitioner<IndexIndexWritable,V> {
+
+ @Override
+ public void configure(JobConf job) {
+ // do nothing
+ }
+
+ @Override
+ public int getPartition(IndexIndexWritable key, V value, int numPartitions) {
+ return key.getAID() % numPartitions;
+ }
+
+}
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java (from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java&r1=937991&r2=938782&rev=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java Wed Apr 28 04:37:19 2010
@@ -15,88 +15,87 @@
* limitations under the License.
*/
-package org.apache.mahout.cf.taste.hadoop;
+package org.apache.mahout.cf.taste.hadoop.item;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.mahout.common.RandomUtils;
-/** A {@link WritableComparable} encapsulating two items. */
-public final class EntityEntityWritable
- implements WritableComparable<EntityEntityWritable>, Cloneable {
-
- private long aID;
- private long bID;
-
- public EntityEntityWritable() {
+/** A {@link WritableComparable} encapsulating two item indices. */
+public final class IndexIndexWritable
+ implements WritableComparable<IndexIndexWritable>, Cloneable {
+
+ private int aID;
+ private int bID;
+
+ public IndexIndexWritable() {
// do nothing
}
-
- public EntityEntityWritable(long aID, long bID) {
+
+ public IndexIndexWritable(int aID, int bID) {
this.aID = aID;
this.bID = bID;
}
-
- public long getAID() {
+
+ public int getAID() {
return aID;
}
-
- public long getBID() {
+
+ public int getBID() {
return bID;
}
- public void set(long aID, long bID) {
+ public void set(int aID, int bID) {
this.aID = aID;
this.bID = bID;
}
-
+
@Override
public void write(DataOutput out) throws IOException {
- out.writeLong(aID);
- out.writeLong(bID);
+ out.writeInt(aID);
+ out.writeInt(bID);
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
- aID = in.readLong();
- bID = in.readLong();
+ aID = in.readInt();
+ bID = in.readInt();
}
-
+
@Override
- public int compareTo(EntityEntityWritable that) {
+ public int compareTo(IndexIndexWritable that) {
int aCompare = compare(aID, that.getAID());
return aCompare == 0 ? compare(bID, that.getBID()) : aCompare;
}
-
- private static int compare(long a, long b) {
+
+ private static int compare(int a, int b) {
return a < b ? -1 : a > b ? 1 : 0;
}
-
+
@Override
public int hashCode() {
- return RandomUtils.hashLong(aID) + 31 * RandomUtils.hashLong(bID);
+ return aID + 31 * bID;
}
-
+
@Override
public boolean equals(Object o) {
- if (o instanceof EntityEntityWritable) {
- EntityEntityWritable that = (EntityEntityWritable) o;
+ if (o instanceof IndexIndexWritable) {
+ IndexIndexWritable that = (IndexIndexWritable) o;
return (aID == that.getAID()) && (bID == that.getBID());
}
return false;
}
-
+
@Override
public String toString() {
return aID + "\t" + bID;
}
@Override
- public EntityEntityWritable clone() {
- return new EntityEntityWritable(aID, bID);
+ public IndexIndexWritable clone() {
+ return new IndexIndexWritable(aID, bID);
}
-
+
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java Wed Apr 28 04:37:19 2010
@@ -26,30 +26,35 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.function.LongFloatProcedure;
import org.apache.mahout.math.map.OpenLongFloatHashMap;
public final class PartialMultiplyReducer extends MapReduceBase implements
- Reducer<IntWritable,VectorOrPrefWritable,LongWritable, RandomAccessSparseVectorWritable> {
+ Reducer<IntWritable,VectorOrPrefWritable,LongWritable,VectorWritable> {
@Override
public void reduce(IntWritable key,
Iterator<VectorOrPrefWritable> values,
- final OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
+ final OutputCollector<LongWritable,VectorWritable> output,
Reporter reporter) throws IOException {
+
OpenLongFloatHashMap savedValues = new OpenLongFloatHashMap();
Vector cooccurrenceColumn = null;
final int itemIndex = key.get();
final LongWritable userIDWritable = new LongWritable();
- final RandomAccessSparseVectorWritable vectorWritable = new RandomAccessSparseVectorWritable();
+ final VectorWritable vectorWritable = new VectorWritable();
+
while (values.hasNext()) {
+
VectorOrPrefWritable value = values.next();
if (value.getVector() == null) {
+
// Then this is a user-pref value
long userID = value.getUserID();
float preferenceValue = value.getValue();
+
if (cooccurrenceColumn == null) {
// Haven't seen the co-occurrencce column yet; save it
savedValues.put(userID, preferenceValue);
@@ -62,8 +67,12 @@ public final class PartialMultiplyReduce
vectorWritable.set(partialProduct);
output.collect(userIDWritable, vectorWritable);
}
+
} else {
+
+ // Then this is the column vector
cooccurrenceColumn = value.getVector();
+
final Vector theColumn = cooccurrenceColumn;
savedValues.forEachPair(new LongFloatProcedure() {
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed Apr 28 04:37:19 2010
@@ -19,6 +19,9 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.cli2.Option;
import org.apache.hadoop.conf.Configuration;
@@ -26,11 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -39,12 +40,11 @@ import org.apache.hadoop.mapred.TextOutp
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
/**
* <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
@@ -98,34 +98,44 @@ public final class RecommenderJob extend
String cooccurrencePath = tempDirPath + "/cooccurrence";
String parialMultiplyPath = tempDirPath + "/partialMultiply";
+ AtomicInteger currentPhase = new AtomicInteger();
+
JobConf itemIDIndexConf = prepareJobConf(
inputPath, itemIDIndexPath, TextInputFormat.class,
ItemIDIndexMapper.class, IntWritable.class, LongWritable.class,
ItemIDIndexReducer.class, IntWritable.class, LongWritable.class,
- MapFileOutputFormat.class);
- JobClient.runJob(itemIDIndexConf);
+ SequenceFileOutputFormat.class);
+ itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ JobClient.runJob(itemIDIndexConf);
+ }
JobConf toUserVectorConf = prepareJobConf(
inputPath, userVectorPath, TextInputFormat.class,
ToItemPrefsMapper.class, LongWritable.class, booleanData ? LongWritable.class : EntityPrefWritable.class,
- ToUserVectorReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+ ToUserVectorReducer.class, LongWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
- JobClient.runJob(toUserVectorConf);
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ JobClient.runJob(toUserVectorConf);
+ }
JobConf toCooccurrenceConf = prepareJobConf(
userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
- UserVectorToCooccurrenceMapper.class, IntWritable.class, EntityCountWritable.class,
- UserVectorToCooccurrenceReducer.class, IntWritable.class, RandomAccessSparseVectorWritable.class,
+ UserVectorToCooccurrenceMapper.class, IndexIndexWritable.class, IntWritable.class,
+ UserVectorToCooccurrenceReducer.class, IntWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
- toCooccurrenceConf.setInt("io.sort.mb", 600);
+ setIOSort(toCooccurrenceConf);
toCooccurrenceConf.setClass("mapred.combiner.class", CooccurrenceCombiner.class, Reducer.class);
- JobClient.runJob(toCooccurrenceConf);
+ toCooccurrenceConf.setClass("mapred.partitioner.class", FirstIndexPartitioner.class, Partitioner.class);
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ JobClient.runJob(toCooccurrenceConf);
+ }
JobConf partialMultiplyConf = prepareJobConf(
cooccurrencePath, parialMultiplyPath, SequenceFileInputFormat.class,
CooccurrenceColumnWrapperMapper.class, IntWritable.class, VectorOrPrefWritable.class,
- PartialMultiplyReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+ PartialMultiplyReducer.class, LongWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
MultipleInputs.addInputPath(
partialMultiplyConf,
@@ -138,22 +148,38 @@ public final class RecommenderJob extend
if (usersFile != null) {
partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
- JobClient.runJob(partialMultiplyConf);
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ JobClient.runJob(partialMultiplyConf);
+ }
JobConf aggregateAndRecommendConf = prepareJobConf(
parialMultiplyPath, outputPath, SequenceFileInputFormat.class,
- IdentityMapper.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+ IdentityMapper.class, LongWritable.class, VectorWritable.class,
AggregateAndRecommendReducer.class, LongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
- aggregateAndRecommendConf.setInt("io.sort.mb", 600);
+ setIOSort(aggregateAndRecommendConf);
aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath);
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
- aggregateAndRecommendConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
- JobClient.runJob(aggregateAndRecommendConf);
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ JobClient.runJob(aggregateAndRecommendConf);
+ }
return 0;
}
+
+ private static void setIOSort(JobConf conf) {
+ conf.setInt("io.sort.factor", 100);
+ conf.setInt("io.sort.mb", 1000);
+ String javaOpts = conf.get("mapred.child.java.opts");
+ if (javaOpts != null) {
+ Matcher m = Pattern.compile("Xmx([0-9]+)m").matcher(javaOpts);
+ if (m.matches()) {
+ int heapMB = Integer.parseInt(m.group(1));
+ conf.setInt("io.sort.mb", heapMB / 2);
+ }
+ }
+ }
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new RecommenderJob(), args);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Wed Apr 28 04:37:19 2010
@@ -19,19 +19,16 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
/**
* <h1>Input</h1>
@@ -49,34 +46,19 @@ import org.apache.mahout.math.Vector;
* {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for later with a combination of
* {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
* </p>
- *
- * <p>
- * The number of non-default elements actually retained in the user vector is capped at
- * {@link #MAX_PREFS_CONSIDERED}.
- * </p>
- *
*/
public final class ToUserVectorReducer extends MapReduceBase implements
- Reducer<LongWritable,LongWritable,LongWritable, RandomAccessSparseVectorWritable> {
-
- public static final int MAX_PREFS_CONSIDERED = 20;
-
- private boolean booleanData;
-
- @Override
- public void configure(JobConf jobConf) {
- booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
- }
+ Reducer<LongWritable,LongWritable,LongWritable, VectorWritable> {
@Override
public void reduce(LongWritable userID,
Iterator<LongWritable> itemPrefs,
- OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
+ OutputCollector<LongWritable,VectorWritable> output,
Reporter reporter) throws IOException {
if (!itemPrefs.hasNext()) {
return;
}
- RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
while (itemPrefs.hasNext()) {
LongWritable itemPref = itemPrefs.next();
int index = ItemIDIndexMapper.idToIndex(itemPref.get());
@@ -89,38 +71,7 @@ public final class ToUserVectorReducer e
userVector.set(index, value);
}
- if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
- double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
- userVector);
- RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
- MAX_PREFS_CONSIDERED);
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- while (it.hasNext()) {
- Vector.Element element = it.next();
- if (element.get() >= cutoff) {
- filteredVector.set(element.index(), element.get());
- }
- }
- userVector = filteredVector;
- }
-
- RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable(userVector);
- output.collect(userID, writable);
- }
-
- private static double findTopNPrefsCutoff(int n, Vector userVector) {
- Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- while (it.hasNext()) {
- double prefValue = it.next().get();
- if (topPrefValues.size() < n) {
- topPrefValues.add(prefValue);
- } else if (prefValue > topPrefValues.peek()) {
- topPrefValues.add(prefValue);
- topPrefValues.poll();
- }
- }
- return topPrefValues.peek();
+ output.collect(userID, new VectorWritable(userVector));
}
}
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java (from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java&r1=937991&r2=938782&rev=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java Wed Apr 28 04:37:19 2010
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.mahout.cf.taste.hadoop.cooccurence;
+package org.apache.mahout.cf.taste.hadoop.item;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Wed Apr 28 04:37:19 2010
@@ -32,11 +32,11 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.impl.common.FastIDSet;
import org.apache.mahout.common.FileLineIterable;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
public final class UserVectorSplitterMapper extends MapReduceBase implements
- Mapper<LongWritable,RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+ Mapper<LongWritable, VectorWritable,IntWritable,VectorOrPrefWritable> {
static final String USERS_FILE = "usersFile";
@@ -64,7 +64,7 @@ public final class UserVectorSplitterMap
@Override
public void map(LongWritable key,
- RandomAccessSparseVectorWritable value,
+ VectorWritable value,
OutputCollector<IntWritable,VectorOrPrefWritable> output,
Reporter reporter) throws IOException {
long userID = key.get();
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Wed Apr 28 04:37:19 2010
@@ -26,31 +26,86 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
- Mapper<LongWritable, RandomAccessSparseVectorWritable,IntWritable, EntityCountWritable> {
-
+ Mapper<LongWritable, VectorWritable,IndexIndexWritable,IntWritable> {
+
+ private static final int MAX_PREFS_CONSIDERED = 50;
+
+ private boolean outputGuardValue = true;
+ private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+
@Override
public void map(LongWritable userID,
- RandomAccessSparseVectorWritable userVector,
- OutputCollector<IntWritable,EntityCountWritable> output,
+ VectorWritable userVectorWritable,
+ OutputCollector<IndexIndexWritable,IntWritable> output,
Reporter reporter) throws IOException {
- Iterator<Vector.Element> it = userVector.get().iterateNonZero();
- EntityCountWritable entityCount = new EntityCountWritable();
- IntWritable writable1 = new IntWritable();
+ Vector userVector = maybePruneUserVector(userVectorWritable.get());
+ countSeen(userVector);
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ IndexIndexWritable entityEntity = new IndexIndexWritable();
+ IntWritable one = new IntWritable(1);
while (it.hasNext()) {
int index1 = it.next().index();
- writable1.set(index1);
- Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
+ Iterator<Vector.Element> it2 = userVector.iterateNonZero();
while (it2.hasNext()) {
int index2 = it2.next().index();
- entityCount.set(index2, 1);
- output.collect(writable1, entityCount);
+ if (index1 != index2) {
+ entityEntity.set(index1, index2);
+ output.collect(entityEntity, one);
+ }
}
}
+ // Guard value, output once, sorts after everything; will be dropped by combiner
+ if (outputGuardValue) {
+ output.collect(new IndexIndexWritable(Integer.MAX_VALUE, Integer.MAX_VALUE), one);
+ outputGuardValue = false;
+ }
+ }
+
+ private Vector maybePruneUserVector(Vector userVector) {
+ if (userVector.getNumNondefaultElements() <= MAX_PREFS_CONSIDERED) {
+ return userVector;
+ }
+
+ OpenIntIntHashMap countCounts = new OpenIntIntHashMap();
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ int index = it.next().index();
+ int count = indexCounts.get(index);
+ countCounts.adjustOrPutValue(count, 1, 1);
+ }
+
+ int resultingSizeAtCutoff = 0;
+ int cutoff = 0;
+ while (resultingSizeAtCutoff <= MAX_PREFS_CONSIDERED) {
+ cutoff++;
+ int count = indexCounts.get(cutoff);
+ resultingSizeAtCutoff += count;
+ }
+
+ Iterator<Vector.Element> it2 = userVector.iterateNonZero();
+ while (it2.hasNext()) {
+ Vector.Element e = it2.next();
+ int index = e.index();
+ int count = indexCounts.get(index);
+ if (count > cutoff) {
+ e.set(0.0);
+ }
+ }
+
+ return userVector;
+ }
+
+ private void countSeen(Vector userVector) {
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ int index = it.next().index();
+ indexCounts.adjustOrPutValue(index, 1, 1);
+ }
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Wed Apr 28 04:37:19 2010
@@ -25,39 +25,53 @@ import org.apache.hadoop.mapred.MapReduc
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
-import org.apache.mahout.math.function.IntIntProcedure;
-import org.apache.mahout.math.map.OpenIntIntHashMap;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
- Reducer<IntWritable, EntityCountWritable,IntWritable, RandomAccessSparseVectorWritable> {
+ Reducer<IndexIndexWritable,IntWritable,IntWritable,VectorWritable> {
+
+ private int lastItem1ID = Integer.MIN_VALUE;
+ private int lastItem2ID = Integer.MIN_VALUE;
+ private Vector cooccurrenceRow = null;
+ private int count = 0;
@Override
- public void reduce(IntWritable index1,
- Iterator<EntityCountWritable> index2s,
- OutputCollector<IntWritable,RandomAccessSparseVectorWritable> output,
+ public void reduce(IndexIndexWritable entityEntity,
+ Iterator<IntWritable> counts,
+ OutputCollector<IntWritable,VectorWritable> output,
Reporter reporter) throws IOException {
- OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
- while (index2s.hasNext()) {
- EntityCountWritable writable = index2s.next();
- int index = (int) writable.getID();
- int oldValue = indexCounts.get(index);
- indexCounts.put(index, oldValue + writable.getCount());
- }
+ int item1ID = entityEntity.getAID();
+ int item2ID = entityEntity.getBID();
- final RandomAccessSparseVector cooccurrenceRow =
- new RandomAccessSparseVector(Integer.MAX_VALUE, 1000);
- indexCounts.forEachPair(new IntIntProcedure() {
- @Override
- public boolean apply(int index, int count) {
- cooccurrenceRow.set(index, count);
- return true;
+ if (item1ID < lastItem1ID) {
+ throw new IllegalStateException();
+ }
+ if (item1ID == lastItem1ID) {
+ if (item2ID < lastItem2ID) {
+ throw new IllegalStateException();
}
- });
- output.collect(index1, new RandomAccessSparseVectorWritable(cooccurrenceRow));
+ if (item2ID == lastItem2ID) {
+ count += CooccurrenceCombiner.sum(counts);
+ } else {
+ if (cooccurrenceRow == null) {
+ cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ }
+ cooccurrenceRow.set(item2ID, count);
+ lastItem2ID = item2ID;
+ count = CooccurrenceCombiner.sum(counts);
+ }
+ } else {
+ if (cooccurrenceRow != null) {
+ output.collect(new IntWritable(lastItem1ID), new VectorWritable(cooccurrenceRow));
+ }
+ lastItem1ID = item1ID;
+ lastItem2ID = item2ID;
+ cooccurrenceRow = null;
+ count = CooccurrenceCombiner.sum(counts);
+ }
}
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java Wed Apr 28 04:37:19 2010
@@ -23,20 +23,19 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
public final class VectorOrPrefWritable implements Writable {
- private RandomAccessSparseVector vector;
+ private Vector vector;
private long userID;
private float value;
public VectorOrPrefWritable() {
}
- public VectorOrPrefWritable(RandomAccessSparseVector vector) {
+ public VectorOrPrefWritable(Vector vector) {
this.vector = vector;
}
@@ -57,7 +56,7 @@ public final class VectorOrPrefWritable
return value;
}
- public void set(RandomAccessSparseVector vector) {
+ public void set(Vector vector) {
this.vector = vector;
this.userID = Long.MIN_VALUE;
this.value = Float.NaN;
@@ -77,7 +76,7 @@ public final class VectorOrPrefWritable
out.writeFloat(value);
} else {
out.writeBoolean(true);
- new RandomAccessSparseVectorWritable(vector).write(out);
+ new VectorWritable(vector).write(out);
}
}
@@ -85,9 +84,9 @@ public final class VectorOrPrefWritable
public void readFields(DataInput in) throws IOException {
boolean hasVector = in.readBoolean();
if (hasVector) {
- RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable();
+ VectorWritable writable = new VectorWritable();
writable.readFields(in);
- set((RandomAccessSparseVector) writable.get());
+ set(writable.get());
} else {
long theUserID = in.readLong();
float theValue = in.readFloat();
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Wed Apr 28 04:37:19 2010
@@ -20,6 +20,7 @@ package org.apache.mahout.common;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli2.Argument;
import org.apache.commons.cli2.CommandLine;
@@ -93,18 +94,25 @@ public abstract class AbstractJob extend
argBuilder = argBuilder.withDefault(defaultValue);
}
Argument arg = argBuilder.create();
- return new DefaultOptionBuilder().withLongName(name).withRequired(required).withShortName(shortName)
- .withArgument(arg).withDescription(description).create();
+ DefaultOptionBuilder optBuilder = new DefaultOptionBuilder().withLongName(name).withRequired(required)
+ .withArgument(arg).withDescription(description);
+ if (shortName != null) {
+ optBuilder = optBuilder.withShortName(shortName);
+ }
+ return optBuilder.create();
}
protected static Map<String,String> parseArguments(String[] args, Option... extraOpts) {
- Option tempDirOpt = buildOption("tempDir", "t", "Intermediate output directory", "temp");
+ Option tempDirOpt = buildOption("tempDir", null, "Intermediate output directory", "temp");
Option helpOpt = DefaultOptionCreator.helpOption();
+ Option startPhase = buildOption("startPhase", null, "First phase to run", "0");
+ Option endPhase = buildOption("endPhase", null, "Last phase to run", String.valueOf(Integer.MAX_VALUE));
GroupBuilder gBuilder = new GroupBuilder().withName("Options")
.withOption(tempDirOpt)
- .withOption(helpOpt);
+ .withOption(helpOpt)
+ .withOption(startPhase).withOption(endPhase);
for (Option opt : extraOpts) {
gBuilder = gBuilder.withOption(opt);
@@ -143,6 +151,14 @@ public abstract class AbstractJob extend
}
}
}
+
+ protected static boolean shouldRunNextPhase(Map<String,String> args, AtomicInteger currentPhase) {
+ int phase = currentPhase.getAndIncrement();
+ String startPhase = args.get("--startPhase");
+ String endPhase = args.get("--endPhase");
+ return !((startPhase != null && phase < Integer.parseInt(startPhase)) ||
+ (endPhase != null && phase > Integer.parseInt(endPhase)));
+ }
protected JobConf prepareJobConf(String inputPath,
String outputPath,
@@ -172,11 +188,7 @@ public abstract class AbstractJob extend
jobConf.setClass("mapred.reducer.class", reducer, Reducer.class);
jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class);
jobConf.setClass("mapred.output.value.class", reducerValue, Writable.class);
- if (jobConf.get("mapred.output.compress") == null) {
- jobConf.setBoolean("mapred.output.compress", true);
- // otherwise leave it to its default value
- }
- jobConf.setCompressMapOutput(true);
+ jobConf.setBoolean("mapred.compress.map.output", true);
String customJobName = jobConf.get("mapred.job.name");
if (customJobName == null) {