You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2009/10/19 00:19:57 UTC

svn commit: r826539 [1/2] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/common/cache/ core/src/main/java/org/apache/mahout/fpm/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/...

Author: robinanil
Date: Sun Oct 18 22:19:56 2009
New Revision: 826539

URL: http://svn.apache.org/viewvc?rev=826539&view=rev
Log:
MAHOUT-157 Parallel Frequent Pattern Mining using FPGrowth

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/package.html
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An Ordered List of Integers which can be used in a Hadoop Map/Reduce Job
+ * 
+ * 
+ */
+public class IntegerTuple implements Writable, WritableComparable<IntegerTuple> {
+
+  private List<Integer> tuple = new ArrayList<Integer>();
+
+  public IntegerTuple() {
+  }
+
+  public IntegerTuple(Integer firstEntry) {
+    add(firstEntry);
+  }
+  
+  public IntegerTuple(Collection<Integer> entries) {
+    for(Integer entry: entries)
+      add(entry);
+  }
+  
+  public IntegerTuple(Integer[] entries) {
+    for(Integer entry: entries)
+      add(entry);
+  }
+
+  /**
+   * add an entry to the end of the list
+   * 
+   * @param entry
+   * @return true if the items get added
+   */
+  public boolean add(Integer entry) {
+    return tuple.add(entry);
+  }
+
+  /**
+   * Fetches the string at the given location
+   * 
+   * @param index
+   * @return String value at the given location in the tuple list
+   */
+  public Integer integerAt(int index) {
+    return tuple.get(index);
+  }
+
+  /**
+   * Replaces the string at the given index with the given newString
+   * 
+   * @param index
+   * @param newString
+   * @return The previous value at that location
+   */
+  public Integer replaceAt(int index, Integer newInteger) {
+    return tuple.set(index, newInteger);
+  }
+
+  /**
+   * Fetch the list of entries from the tuple
+   * 
+   * @return a List containing the strings in the order of insertion
+   */
+  public List<Integer> getEntries() {
+    return Collections.unmodifiableList(this.tuple);
+  }
+
+  /**
+   * Returns the length of the tuple
+   * 
+   * @return length
+   */
+  public int length() {
+    return this.tuple.size();
+  }
+
+  @Override
+  public String toString() {
+    return tuple.toString();
+  };
+
+  @Override
+  public int hashCode() {
+    return tuple.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    IntegerTuple other = (IntegerTuple) obj;
+    if (tuple == null) {
+      if (other.tuple != null)
+        return false;
+    } else if (!tuple.equals(other.tuple))
+      return false;
+    return true;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    tuple = new ArrayList<Integer>(len);
+    for (int i = 0; i < len; i++) {
+      int data = in.readInt();
+      tuple.add(data);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tuple.size());
+    for (Integer entry : tuple) {     
+      out.writeInt(entry);     
+    }
+  }
+
+  @Override
+  public int compareTo(IntegerTuple otherTuple) {
+    int min = Math.min(this.length(), otherTuple.length());
+    for (int i = 0; i < min; i++) {
+      int ret = this.tuple.get(i).compareTo(otherTuple.integerAt(i));
+      if (ret == 0)
+        continue;
+      return ret;
+    }
+    return this.length() - otherTuple.length();
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class StringRecordIterator implements Iterator<List<String>> {
+ 
+  private Iterator<String> lineIterator;
+  private Pattern splitter = null;
+  public StringRecordIterator(FileLineIterable iterable, String pattern)
+  {
+    this.lineIterator = iterable.iterator();
+    this.splitter = Pattern.compile(pattern);
+  }
+  @Override
+  public boolean hasNext() {
+    return lineIterator.hasNext();
+  }
+
+  @Override
+  public List<String> next() {
+    String line = lineIterator.next();
+    String[] items = splitter.split(line);
+    return Arrays.asList(items);
+  }
+
+  @Override
+  public void remove() {
+    lineIterator.remove();
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.cache;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public class LeastKCache<K extends Comparable<? super K>, V> implements
+    Cache<K, V> {
+
+  private int capacity = 0;
+
+  private Map<K, V> cache = null;
+
+  private PriorityQueue<K> queue = null;
+
+  public LeastKCache(final int capacity) {
+
+    this.capacity = capacity;
+
+    cache = new HashMap<K, V>(capacity);
+    queue = new PriorityQueue<K>(capacity, new Comparator<K>() {
+
+      @Override
+      public int compare(K o1, K o2) {
+        return o2.compareTo(o1);
+      }
+
+    });
+
+  }
+
+  @Override
+  final public long capacity() {
+    return capacity;
+  }
+
+  @Override
+  final public V get(K key) {
+    return cache.get(key);
+  }
+
+  @Override
+  final public void set(K key, V value) {
+    if (contains(key) == false) {
+      queue.add(key);
+    }
+    cache.put(key, value);
+    while (queue.size() > capacity) {
+      K k = queue.poll();
+      cache.remove(k);
+    }
+  }
+
+  @Override
+  final public long size() {
+    return cache.size();
+  }
+
+  @Override
+  final public boolean contains(K key) {
+    return (cache.containsKey(key));
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 
+ * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so
+ * that reducer can group them and select the top K frequent patterns
+ * 
+ */
+public class AggregatorMapper extends
+    Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
+
+  @Override
+  protected void map(Text key, TopKStringPatterns values, Context context)
+      throws IOException, InterruptedException {
+    for (Pair<List<String>, Long> pattern : values.getPatterns()) {
+      for (String item : pattern.getFirst()) {
+        List<Pair<List<String>, Long>> patternSingularList = new ArrayList<Pair<List<String>, Long>>();
+        patternSingularList.add(pattern);
+        context.setStatus("Aggregator Mapper:Grouping Patterns for "+item);
+        context.write(new Text(item), 
+            new TopKStringPatterns(patternSingularList));
+      }
+    }
+
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 
+ * {@link AggregatorReducer} groups all Frequent Patterns containing an item and
+ * outputs the top K patterns containing that particular item
+ * 
+ */
+public class AggregatorReducer extends
+    Reducer<Text, TopKStringPatterns, Text, TopKStringPatterns> {
+
+  private int maxHeapSize = 50;
+
+  @Override
+  protected void reduce(Text key, Iterable<TopKStringPatterns> values,
+      Context context) throws IOException, InterruptedException {
+    TopKStringPatterns patterns = new TopKStringPatterns();
+    Iterator<TopKStringPatterns> it = values.iterator();
+    while (it.hasNext()) {
+      context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key);
+      patterns = patterns.merge(it.next(), maxHeapSize);
+    }
+    context.write(key, patterns);
+
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    super.setup(context);
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+        "pfp.parameters", ""));
+    maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
+
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.IntegerTuple;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in
+ * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
+ * 
+ */
+public class PFPGrowth {
+  private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
+
+  public static Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
+
+  /**
+   * 
+   * @param params params should contain input and output locations as a string
+   *        value, the additional parameters include minSupport(3),
+   *        maxHeapSize(50), numGroups(1000)
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  public static void runPFPGrowth(Parameters params) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    startParallelCounting(params);
+    startGroupingItems(params);
+    startParallelFPGrowth(params);
+    startAggregating(params);
+  }
+
+  /**
+   * Converts a given Map in to a String using DefaultStringifier of Hadoop
+   * 
+   * @param map
+   * @param conf
+   * @return string representation of the map
+   * @throws IOException
+   */
+  private static String serializeMap(Map<String, Long> map, Configuration conf)
+      throws IOException {
+    conf
+        .set(
+            "io.serializations",
+            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
+        conf, GenericsUtil.getClass(map));
+    String serializedMapString = mapStringifier.toString(map);
+    return serializedMapString;
+  }
+
+  /**
+   * Generates the gList(Group ID Mapping of Various frequent Features) Map from
+   * the corresponding serialized representation
+   * 
+   * @param params
+   * @param key
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  public static Map<String, Long> deserializeMap(Parameters params, String key,
+      Configuration conf) throws IOException {
+    Map<String, Long> map = new HashMap<String, Long>();
+    conf
+        .set(
+            "io.serializations",
+            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+
+    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
+        conf, GenericsUtil.getClass(map));
+    String gListString = mapStringifier.toString(map);
+    gListString = params.get(key, gListString);
+    map = mapStringifier.fromString(gListString);
+    return map;
+  }
+
+  /**
+   * Serializes the fList and returns the string representation of the List
+   * 
+   * @param list
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  private static String serializeList(List<Pair<String, Long>> list,
+      Configuration conf) throws IOException {
+    conf
+        .set(
+            "io.serializations",
+            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+    DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
+        conf, GenericsUtil.getClass(list));
+    String serializedListString = listStringifier.toString(list);
+    return serializedListString;
+  }
+
+  /**
+   * Generates the fList from the serialized string representation
+   * 
+   * @param params
+   * @param key
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  public static List<Pair<String, Long>> deserializeList(Parameters params,
+      String key, Configuration conf) throws IOException {
+    List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
+    conf
+        .set(
+            "io.serializations",
+            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+
+    DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
+        conf, GenericsUtil.getClass(list));
+    String serializedString = listStringifier.toString(list);
+    serializedString = params.get(key, serializedString);
+    list = listStringifier.fromString(serializedString);
+    return list;
+  }
+
+  /**
+   * Count the frequencies of various features in parallel using Map/Reduce
+   * 
+   * @param params
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static void startParallelCounting(Parameters params)
+      throws IOException, InterruptedException, ClassNotFoundException {
+
+    Configuration conf = new Configuration();
+    conf.set("pfp.parameters", params.toString());
+
+    String input = params.get("input");
+    Job job = new Job(conf, "Parallel Counting Driver running over input: "
+        + input);
+    job.setJarByClass(PFPGrowth.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+
+    FileInputFormat.addInputPath(job, new Path(input));
+    Path outPath = new Path(params.get("output") + "/parallelcounting");
+    FileOutputFormat.setOutputPath(job, outPath);
+
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(ParallelCountingMapper.class);
+    job.setCombinerClass(ParallelCountingReducer.class);
+    job.setReducerClass(ParallelCountingReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+    job.waitForCompletion(true);
+
+  }
+
+  /**
+   * read the feature frequency List which is built at the end of the Parallel
+   * counting job
+   * 
+   * @param params
+   * @return
+   * @throws IOException
+   */
+  public static List<Pair<String, Long>> readFList(Parameters params)
+      throws IOException {
+    Writable key = new Text();
+    LongWritable value = new LongWritable();
+    int minSupport = Integer.valueOf(params.get("minSupport", "3"));
+    Configuration conf = new Configuration();
+
+    FileSystem fs = FileSystem.get(new Path(params.get("output")
+        + "/parallelcounting").toUri(), conf);
+    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
+        + "/parallelcounting/part-*"));
+
+    PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(
+        11, new Comparator<Pair<String, Long>>() {
+
+          @Override
+          public int compare(Pair<String, Long> o1, Pair<String, Long> o2) {
+            int ret = o2.getSecond().compareTo(o1.getSecond());
+            if (ret != 0)
+              return ret;
+            return o1.getFirst().compareTo(o2.getFirst());
+          }
+
+        });
+    for (FileStatus fileStatus : outputFiles) {
+      Path path = fileStatus.getPath();
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      // key is feature value is count
+      while (reader.next(key, value)) {
+        if (value.get() < minSupport)
+          continue;
+        queue.add(new Pair<String, Long>(key.toString(), value.get()));
+      }
+    }
+    List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
+    while (queue.isEmpty() == false)
+      fList.add(queue.poll());
+    return fList;
+  }
+
+  /**
+   * Read the Frequent Patterns generated from Text
+   * 
+   * @param params
+   * @return List of TopK patterns for each string frequent feature
+   * @throws IOException
+   */
+  public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(
+      Parameters params) throws IOException {
+
+    Configuration conf = new Configuration();
+
+    FileSystem fs = FileSystem.get(new Path(params.get("output")
+        + "/frequentPatterns").toUri(), conf);
+    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
+        + "/frequentPatterns/part-*"));
+
+    List<Pair<String, TopKStringPatterns>> ret = new ArrayList<Pair<String, TopKStringPatterns>>();
+    for (FileStatus fileStatus : outputFiles) {
+      Path path = fileStatus.getPath();
+      ret.addAll(FPGrowth.readFrequentPattern(fs, conf, path));
+    }
+    return ret;
+  }
+
+  /**
+   * Group the given Features into g groups as defined by the numGroups
+   * parameter in params
+   * 
+   * @param params
+   * @throws IOException
+   */
+  public static void startGroupingItems(Parameters params) throws IOException {
+    Configuration conf = new Configuration();
+    List<Pair<String, Long>> fList = readFList(params);
+    Integer numGroups = Integer.valueOf(params.get("numGroups", "50"));
+
+    Map<String, Long> gList = new HashMap<String, Long>();
+    long groupID = 0;
+    long i = 0;
+    long maxPerGroup = fList.size() / numGroups;
+    if (fList.size() != maxPerGroup * numGroups)
+      maxPerGroup = maxPerGroup + 1;
+
+    for (Pair<String, Long> featureFreq : fList) {
+      String feature = featureFreq.getFirst();
+      if (i / (maxPerGroup) == groupID) {
+        gList.put(feature, groupID);
+      } else {
+        groupID++;
+        gList.put(feature, groupID);
+      }
+      i++;
+    }
+
+    log.info("No of Features: {}", fList.size());
+
+    params.set("gList", serializeMap(gList, conf));
+    params.set("fList", serializeList(fList, conf));
+  }
+
+  /**
+   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of
+   * group dependent shards
+   * 
+   * @param params
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static void startParallelFPGrowth(Parameters params)
+      throws IOException, InterruptedException, ClassNotFoundException {
+
+    Configuration conf = new Configuration();
+    conf.set("pfp.parameters", params.toString());
+
+    String input = params.get("input");
+    Job job = new Job(conf, "PFP Growth Driver running over input" + input);
+    job.setJarByClass(PFPGrowth.class);
+
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(IntegerTuple.class);
+    
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(TopKStringPatterns.class);
+
+    FileInputFormat.addInputPath(job, new Path(input));
+    Path outPath = new Path(params.get("output") + "/fpgrowth");
+    FileOutputFormat.setOutputPath(job, outPath);
+
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(ParallelFPGrowthMapper.class);
+    job.setReducerClass(ParallelFPGrowthReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+    job.waitForCompletion(true);
+  }
+
+  /**
+   * Run the aggregation Job to aggregate the different TopK patterns and group
+   * each Pattern by the features present in it and thus calculate the final Top
+   * K frequent Patterns for each feature
+   * 
+   * @param params
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static void startAggregating(Parameters params) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    Configuration conf = new Configuration();
+    params.set("fList", "");
+    params.set("gList", "");
+    conf.set("pfp.parameters", params.toString());
+
+    String input = params.get("output") + "/fpgrowth";
+    Job job = new Job(conf, "PFP Aggregator Driver running over input: "
+        + input);
+    job.setJarByClass(PFPGrowth.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(TopKStringPatterns.class);
+
+    FileInputFormat.addInputPath(job, new Path(input));
+    Path outPath = new Path(params.get("output") + "/frequentPatterns");
+    FileOutputFormat.setOutputPath(job, outPath);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapperClass(AggregatorMapper.class);
+    job.setCombinerClass(AggregatorReducer.class);
+    job.setReducerClass(AggregatorReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+    job.waitForCompletion(true);
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Parameters;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * 
+ * {@link ParallelCountingMapper} maps all items in a particular transaction
+ * like the way it is done in Hadoop WordCount example
+ * 
+ */
+public class ParallelCountingMapper extends
+    Mapper<LongWritable, Text, Text, LongWritable> {
+
+  private static final LongWritable one = new LongWritable(1);
+
+  private Pattern splitter = null;
+
+  @Override
+  protected void map(LongWritable offset, Text input, Context context)
+      throws IOException, InterruptedException {
+
+    String[] items = splitter.split(input.toString());
+    for (String item : items){      
+      if(item.trim().length()==0) continue;
+      context.setStatus("Parallel Counting Mapper: "+  item);
+      context.write(new Text(item), one);
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    super.setup(context);
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+        "pfp.parameters", ""));
+    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
+        .toString()));
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * {@link ParallelCountingReducer} sums up the item count and output the item
+ * and the count This can also be used as a local Combiner. A simple summing
+ * reducer
+ */
+public class ParallelCountingReducer extends
+    Reducer<Text, LongWritable, Text, LongWritable> {
+
+  protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+      throws IOException, InterruptedException {
+    long sum = 0;
+    Iterator<LongWritable> it = values.iterator();
+    while (it.hasNext()) {
+      context.setStatus("Parallel Counting Reducer :" + key);
+      sum += it.next().get();
+    }
+    context.setStatus("Parallel Counting Reducer: " + key + " => " + sum);
+    context.write(key, new LongWritable(sum));
+
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.IntegerTuple;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
+
+/**
+ * {@link ParallelFPGrowthMapper} maps each transaction to all unique items
+ * groups in the transaction. mapper outputs the group id as key and the
+ * transaction as value
+ * 
+ */
+public class ParallelFPGrowthMapper extends
+    Mapper<LongWritable, Text, LongWritable, IntegerTuple> {
+
+  private Map<String, Integer> fMap = new HashMap<String, Integer>();
+
+  private Map<Integer, Long> gListInt = new HashMap<Integer, Long>();
+
+  private Pattern splitter = null;
+
+  @Override
+  protected void map(LongWritable offset, Text input, Context context)
+      throws IOException, InterruptedException {
+
+    String[] items = splitter.split(input.toString());
+
+    List<Integer> itemSet = new ArrayList<Integer>();
+    for (String item : items) // remove items not in the fList
+    {
+      if (fMap.containsKey(item) && item.trim().length() != 0)
+        itemSet.add(fMap.get(item));
+    }
+
+    Collections.sort(itemSet);
+
+    Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
+    
+    Set<Long> groups = new HashSet<Long>();
+    for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
+                                                        // dependent
+                                                        // shards
+      Integer item = prunedItems[j];
+      Long groupID = gListInt.get(item);
+      if (groups.contains(groupID) == false) {
+        Integer[] tempItems = new Integer[j + 1];
+        System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
+        context
+            .setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: "
+                + item);
+        context.write(new LongWritable(groupID), new IntegerTuple(tempItems));
+      }
+      groups.add(groupID);
+    }
+
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    super.setup(context);
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+        "pfp.parameters", ""));
+
+    int i = 0;
+    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
+        context.getConfiguration())) {
+      fMap.put(e.getFirst(), i++);
+    }
+    
+    for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList",
+        context.getConfiguration()).entrySet()) {
+      gListInt.put(fMap.get(e.getKey()), e.getValue());
+    }
+    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
+        .toString()));
+
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.IntegerTuple;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
+import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
+import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * {@link ParallelFPGrowthReducer} takes each group of transactions and runs
+ * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
+ * group.
+ * 
+ */
+
+public class ParallelFPGrowthReducer extends
+    Reducer<LongWritable, IntegerTuple, Text, TopKStringPatterns> {
+
+  private List<Pair<Integer, Long>> fList = new ArrayList<Pair<Integer, Long>>();
+  
+  private List<String> featureReverseMap = new ArrayList<String>();
+  
+  private Map<String, Integer> fMap = new HashMap<String, Integer>();
+
+  private Map<Long, List<Integer>> groupFeatures = new HashMap<Long, List<Integer>>();
+
+  private int maxHeapSize = 50;
+
+  private int minSupport = 3;
+
+  @Override
+  public void reduce(LongWritable key, Iterable<IntegerTuple> values,
+      Context context) throws IOException {
+    FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
+    fpGrowth
+        .generateTopKFrequentPatterns(
+            new IntegerTupleIterator(values.iterator()),
+            fList,
+            minSupport,
+            maxHeapSize,
+            new HashSet<Integer>(groupFeatures.get(key.get())),
+            new IntegerStringOutputConvertor(
+                new ContextWriteOutputCollector<LongWritable, IntegerTuple, Text, TopKStringPatterns>(
+                    context), featureReverseMap));
+  }
+
+  @Override
+  public void setup(Context context) throws IOException, InterruptedException {
+
+    super.setup(context);
+    Parameters params = Parameters.fromString(context.getConfiguration().get(
+        "pfp.parameters", ""));
+    
+    
+    
+    int i = 0;
+    for(Pair<String, Long> e: PFPGrowth.deserializeList(params, "fList", context
+        .getConfiguration()))
+    {
+      featureReverseMap.add(e.getFirst());
+      fMap.put(e.getFirst(), i);
+      fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
+    }
+    
+    Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList", context
+        .getConfiguration());
+    
+    for (Entry<String, Long> entry : gList.entrySet()) {
+      List<Integer> groupList = groupFeatures.get(entry.getValue());
+      Integer itemInteger = fMap.get(entry.getKey());
+      if (groupList != null)
+        groupList.add(itemInteger);
+      else {
+        groupList = new ArrayList<Integer>();
+        groupList.add(itemInteger);
+        groupFeatures.put(entry.getValue(), groupList);
+      }
+
+    }
+    maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
+    minSupport = Integer.valueOf(params.get("minSupport", "3"));
+    FPTreeDepthCache.FirstLevelCacheSize = Integer.valueOf(params
+        .get("treeCacheSize", Integer
+            .toString(FPTreeDepthCache.FirstLevelCacheSize)));
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable, K extends Writable, V extends Writable>
+    implements OutputCollector<K, V> {
+
+  private static final Logger log = LoggerFactory
+      .getLogger(ContextWriteOutputCollector.class);
+
+  private Reducer<IK, IV, K, V>.Context context;
+
+  public ContextWriteOutputCollector(Reducer<IK, IV, K, V>.Context context)
+      throws IOException {
+    this.context = context;
+  }
+
+  @Override
+  public final void collect(K key, V value) throws IOException {
+    try {
+      context.setStatus("Writing Top K patterns for: " + key.toString());
+      context.write(key, value);
+    } catch (InterruptedException e) {
+      log.error("{}", e);
+    }
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public class SequenceFileOutputCollector<K extends Writable, V extends Writable>
+    implements OutputCollector<K, V> {
+  private SequenceFile.Writer writer;
+
+  public SequenceFileOutputCollector(SequenceFile.Writer writer)
+      throws IOException {
+    this.writer = writer;
+  }
+
+  @Override
+  public final void collect(K key, V value) throws IOException {
+    writer.append(key, value);
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
+
+final public class TopKPatternsOutputConvertor<AttributePrimitive> implements
+    OutputCollector<Integer, FrequentPatternMaxHeap> {
+
+  private OutputCollector<AttributePrimitive, List<Pair<List<AttributePrimitive>, Long>>> collector = null;
+
+  private Map<Integer, AttributePrimitive> reverseMapping = null;
+
+  public TopKPatternsOutputConvertor(
+      OutputCollector<AttributePrimitive, List<Pair<List<AttributePrimitive>, Long>>> collector,
+      Map<Integer, AttributePrimitive> reverseMapping) {
+    this.collector = collector;
+    this.reverseMapping = reverseMapping;
+  }
+
+  @Override
+  final public void collect(Integer key, FrequentPatternMaxHeap value)
+      throws IOException {
+    List<Pair<List<AttributePrimitive>, Long>> perAttributePatterns = new ArrayList<Pair<List<AttributePrimitive>, Long>>();
+    for (Pattern itemSet : value.getHeap()) {
+      List<AttributePrimitive> frequentPattern = new ArrayList<AttributePrimitive>();
+      for (int j = 0; j < itemSet.length(); j++) {
+        frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
+      }
+      Pair<List<AttributePrimitive>, Long> returnItemSet = new Pair<List<AttributePrimitive>, Long>(
+          frequentPattern, itemSet.getSupport());
+      perAttributePatterns.add(returnItemSet);
+    }
+    collector.collect(reverseMapping.get(key), perAttributePatterns);
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TransactionIterator<AttributePrimitive> implements Iterator<int[]> {
+  private Map<AttributePrimitive, Integer> attributeIdMapping = null;
+
+  private Iterator<List<AttributePrimitive>> iterator = null;
+
+  int[] transactionBuffer = null;
+
+  public TransactionIterator(Iterator<List<AttributePrimitive>> iterator,
+      Map<AttributePrimitive, Integer> attributeIdMapping) {
+    this.attributeIdMapping = attributeIdMapping;
+    this.iterator = iterator;
+    transactionBuffer = new int[attributeIdMapping.size()];
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public int[] next() {
+    List<AttributePrimitive> transaction = iterator.next();
+    int index = 0;
+    for (AttributePrimitive Attribute : transaction) {
+      if (attributeIdMapping.containsKey(Attribute)) {
+        transactionBuffer[index++] = attributeIdMapping.get(Attribute);
+      }
+    }
+
+    int[] transactionList = new int[index];
+    System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
+    return transactionList;
+
+  }
+
+  @Override
+  public void remove() {
+    iterator.remove();
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.integer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+
+public final class IntegerStringOutputConvertor implements
+    OutputCollector<Integer, List<Pair<List<Integer>, Long>>> {
+
+  OutputCollector<Text, TopKStringPatterns> collector = null;
+
+  List<String> featureReverseMap = null;
+
+  public IntegerStringOutputConvertor(
+      OutputCollector<Text, TopKStringPatterns> collector,
+      List<String> featureReverseMap) {
+    this.collector = collector;
+    this.featureReverseMap = featureReverseMap;
+  }
+
+  @Override
+  final public void collect(Integer key, List<Pair<List<Integer>, Long>> value)
+      throws IOException {
+    String StringKey = featureReverseMap.get(key);
+    List<Pair<List<String>, Long>> stringValues = new ArrayList<Pair<List<String>, Long>>();
+    for (Pair<List<Integer>, Long> e : value) {
+      List<String> pattern = new ArrayList<String>();
+      for (Integer i : e.getFirst()) {
+        pattern.add(featureReverseMap.get(i));
+      }
+      stringValues.add(new Pair<List<String>, Long>(pattern, e.getSecond()));
+    }
+
+    collector
+        .collect(new Text(StringKey), new TopKStringPatterns(stringValues));
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.integer;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.common.IntegerTuple;
+
+public final class IntegerTupleIterator implements Iterator<List<Integer>> {
+
+  private Iterator<IntegerTuple> iterator = null;
+
+  public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
+    this.iterator = iterator;
+  }
+
+  @Override
+  final public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  final public List<Integer> next() {
+    IntegerTuple transaction = iterator.next();
+    return transaction.getEntries();
+  }
+
+  @Override
+  final public void remove() {
+    iterator.remove();
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+
+public final class StringOutputConvertor implements
+    OutputCollector<String, List<Pair<List<String>, Long>>> {
+
+  OutputCollector<Text, TopKStringPatterns> collector = null;
+
+  public StringOutputConvertor(
+      OutputCollector<Text, TopKStringPatterns> collector) {
+    this.collector = collector;
+  }
+
+  @Override
+  final public void collect(String key, List<Pair<List<String>, Long>> value)
+      throws IOException {
+    collector.collect(new Text(key), new TopKStringPatterns(value));
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.common.StringTuple;
+
+public final class StringTupleIterator implements Iterator<List<String>> {
+
+  private Iterator<StringTuple> iterator = null;
+
+  public StringTupleIterator(Iterator<StringTuple> iterator) {
+    this.iterator = iterator;
+  }
+
+  @Override
+  final public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  final public List<String> next() {
+    StringTuple transaction = iterator.next();
+    return transaction.getEntries();
+  }
+
+  @Override
+  final public void remove() {
+    iterator.remove();
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+public final class TopKStringPatterns implements Writable {
+  private List<Pair<List<String>, Long>> frequentPatterns = null;
+
+  public TopKStringPatterns() {
+    frequentPatterns = new ArrayList<Pair<List<String>, Long>>();
+  }
+
+  public TopKStringPatterns(List<Pair<List<String>, Long>> patterns) {
+    frequentPatterns = new ArrayList<Pair<List<String>, Long>>();
+    frequentPatterns.addAll(patterns);
+  }
+
+  final public Iterator<Pair<List<String>, Long>> iterator() {
+    return frequentPatterns.iterator();
+  }
+
+  final public List<Pair<List<String>, Long>> getPatterns() {
+    return frequentPatterns;
+  }
+
+  final public TopKStringPatterns merge(TopKStringPatterns pattern, int heapSize) {
+    List<Pair<List<String>, Long>> patterns = new ArrayList<Pair<List<String>, Long>>();
+    Iterator<Pair<List<String>, Long>> myIterator = frequentPatterns.iterator();
+    Iterator<Pair<List<String>, Long>> otherIterator = pattern.iterator();
+    Pair<List<String>, Long> myItem = null;
+    Pair<List<String>, Long> otherItem = null;
+    for (int i = 0; i < heapSize; i++) {
+      if (myItem == null && myIterator.hasNext())
+        myItem = myIterator.next();
+      if (otherItem == null && otherIterator.hasNext())
+        otherItem = otherIterator.next();
+      if (myItem != null && otherItem != null) {
+        int cmp = myItem.getSecond().compareTo(otherItem.getSecond());
+        if (cmp == 0) {
+          cmp = myItem.getFirst().size() - otherItem.getFirst().size();
+          if (cmp == 0) {
+            for (int j = 0; j < myItem.getFirst().size(); j++) {
+              cmp = myItem.getFirst().get(j).compareTo(
+                  otherItem.getFirst().get(j));
+              if (cmp == 0)
+                continue;
+              else break;
+            }
+          }
+        }
+        if (cmp <= 0) {
+          patterns.add(otherItem);
+          if (cmp == 0) {
+            myItem = null;
+          }
+          otherItem = null;
+        } else if (cmp > 0) {
+          patterns.add(myItem);
+          myItem = null;
+        }
+      } else if (myItem != null) {
+        patterns.add(myItem);
+        myItem = null;
+      } else if (otherItem != null) {
+        patterns.add(otherItem);
+        otherItem = null;
+      } else
+        break;
+    }
+    return new TopKStringPatterns(patterns);
+  }
+
+  @Override
+  final public void readFields(DataInput in) throws IOException {
+    frequentPatterns.clear();
+    int length = in.readInt();
+    for (int i = 0; i < length; i++) {
+      List<String> items = new ArrayList<String>();
+      int itemsetLength = in.readInt();
+      long support = in.readLong();
+      for (int j = 0; j < itemsetLength; j++) {
+        int itemLength = in.readInt();
+        byte[] data = new byte[itemLength];
+        in.readFully(data);
+        items.add(Bytes.toString(data));
+      }
+      frequentPatterns.add(new Pair<List<String>, Long>(items, support));
+    }
+  }
+
+  @Override
+  final public void write(DataOutput out) throws IOException {
+    out.writeInt(frequentPatterns.size());
+    for (Pair<List<String>, Long> pattern : frequentPatterns) {
+      out.writeInt(pattern.getFirst().size());
+      out.writeLong(pattern.getSecond());
+      for (String item : pattern.getFirst()) {
+        byte[] data = Bytes.toBytes(item);
+        out.writeInt(data.length);
+        out.write(data);
+      }
+    }
+  }
+
+  @Override
+  final public String toString() {
+    StringBuilder sb = new StringBuilder();
+    String sep = "";
+    for (Pair<List<String>, Long> pattern : frequentPatterns) {
+      sb.append(sep);
+      sb.append(pattern.toString());
+      sep = ", ";
+
+    }
+    return sb.toString();
+
+  }
+}