You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by deneche abdelhakim <ad...@gmail.com> on 2010/01/08 09:10:26 UTC

Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ core/src/main/java/org/apache/mah

Hi Robin,

I'm getting a build failure with "mvn clean install":

[INFO] Compilation failure
/home/hakim/mahout-trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java:[89,24]
cannot find symbol
symbol  : class KeyBasedStringTupleReducer
location: class
org.apache.mahout.fpm.pfpgrowth.example.dataset.KeyBasedStringTupleGrouper



On Thu, Jan 7, 2010 at 5:48 PM,  <ro...@apache.org> wrote:
> Author: robinanil
> Date: Thu Jan  7 16:45:37 2010
> New Revision: 896922
>
> URL: http://svn.apache.org/viewvc?rev=896922&view=rev
> Log:
> MAHOUT-221 FP-Bonsai Pruning and Transaction Compaction
>
> Added:
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
>    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/DeliciousTagsExample.java
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleCombiner.java
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleMapper.java
> Modified:
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
>    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
>    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
>    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
>    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java Thu Jan  7 16:45:37 2010
> @@ -22,31 +22,31 @@
>  import java.util.List;
>  import java.util.regex.Pattern;
>
> -public class StringRecordIterator implements Iterator<List<String>> {
> -
> +public class StringRecordIterator implements Iterator<Pair<List<String>,Long>> {
> +
>   private final Iterator<String> lineIterator;
>   private Pattern splitter = null;
> -
> +
>   public StringRecordIterator(FileLineIterable iterable, String pattern) {
>     this.lineIterator = iterable.iterator();
>     this.splitter = Pattern.compile(pattern);
>   }
> -
> +
>   @Override
>   public boolean hasNext() {
>     return lineIterator.hasNext();
>   }
> -
> +
>   @Override
> -  public List<String> next() {
> +  public Pair<List<String>,Long> next() {
>     String line = lineIterator.next();
>     String[] items = splitter.split(line);
> -    return Arrays.asList(items);
> +    return new Pair<List<String>,Long>(Arrays.asList(items), Long.valueOf(1));
>   }
> -
> +
>   @Override
>   public void remove() {
>     lineIterator.remove();
>   }
> -
> -}
> +
> +}
> \ No newline at end of file
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java Thu Jan  7 16:45:37 2010
> @@ -17,34 +17,32 @@
>
>  package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.List;
> +
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.mapreduce.Mapper;
>  import org.apache.mahout.common.Pair;
>  import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
>
> -import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.List;
> -
>  /**
>  *
> - * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so
> - * that reducer can group them and select the top K frequent patterns
> + * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so that reducer can group them
> + * and select the top K frequent patterns
>  *
>  */
> -public class AggregatorMapper extends
> -    Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
> +public class AggregatorMapper extends Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
>
>   @Override
> -  protected void map(Text key, TopKStringPatterns values, Context context)
> -      throws IOException, InterruptedException {
> +  protected void map(Text key, TopKStringPatterns values, Context context) throws IOException,
> +      InterruptedException {
>     for (Pair<List<String>, Long> pattern : values.getPatterns()) {
>       for (String item : pattern.getFirst()) {
>         List<Pair<List<String>, Long>> patternSingularList = new ArrayList<Pair<List<String>, Long>>();
>         patternSingularList.add(pattern);
> -        context.setStatus("Aggregator Mapper:Grouping Patterns for "+item);
> -        context.write(new Text(item),
> -            new TopKStringPatterns(patternSingularList));
> +        context.setStatus("Aggregator Mapper:Grouping Patterns for " + item);
> +        context.write(new Text(item), new TopKStringPatterns(patternSingularList));
>       }
>     }
>
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java Thu Jan  7 16:45:37 2010
> @@ -17,11 +17,12 @@
>
>  package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.mapreduce.Reducer;
>  import org.apache.mahout.common.Parameters;
>  import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> -import java.io.IOException;
>
>  /**
>  *
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,70 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.mahout.common.Pair;
> +
> +public final class MultiTransactionTreeIterator implements Iterator<List<Integer>> {
> +
> +  private Iterator<Pair<List<Integer>, Long>> pIterator = null;
> +
> +  private Pair<List<Integer>, Long> currentPattern = null;
> +
> +  private long currentCount = 0;
> +
> +  public MultiTransactionTreeIterator(Iterator<Pair<List<Integer>, Long>> iterator) {
> +    this.pIterator = iterator;
> +
> +    if (pIterator.hasNext()) {
> +      currentPattern = pIterator.next();
> +      currentCount = 0;
> +    } else {
> +      pIterator = null;
> +    }
> +
> +  }
> +
> +  @Override
> +  public boolean hasNext() {
> +    return pIterator != null;
> +  }
> +
> +  @Override
> +  public List<Integer> next() {
> +    List<Integer> returnable = currentPattern.getFirst();
> +    currentCount++;
> +    if (currentCount == currentPattern.getSecond().longValue()) {
> +      if (pIterator.hasNext()) {
> +        currentPattern = pIterator.next();
> +        currentCount = 0;
> +      } else {
> +        pIterator = null;
> +      }
> +    }
> +    return returnable;
> +  }
> +
> +  @Override
> +  public void remove() {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Thu Jan  7 16:45:37 2010
> @@ -42,7 +42,6 @@
>  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>  import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
>  import org.apache.hadoop.util.GenericsUtil;
> -import org.apache.mahout.common.IntegerTuple;
>  import org.apache.mahout.common.Pair;
>  import org.apache.mahout.common.Parameters;
>  import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> @@ -52,11 +51,11 @@
>
>  /**
>  *
> - * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in
> - * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
> + * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in the paper
> + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
>  *
>  */
> -public class PFPGrowth {
> +public final class PFPGrowth {
>   public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
>
>   private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
> @@ -73,12 +72,11 @@
>    * @return Deserialized Feature Frequency List
>    * @throws IOException
>    */
> -  public static List<Pair<String, Long>> deserializeList(Parameters params,
> -      String key, Configuration conf) throws IOException {
> +  public static List<Pair<String, Long>> deserializeList(Parameters params, String key, Configuration conf)
> +      throws IOException {
>     List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
> -    conf.set(
> -            "io.serializations",
> -            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> +    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> +        + "org.apache.hadoop.io.serializer.WritableSerialization");
>
>     DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
>         conf, GenericsUtil.getClass(list));
> @@ -89,8 +87,8 @@
>   }
>
>   /**
> -   * Generates the gList(Group ID Mapping of Various frequent Features) Map from
> -   * the corresponding serialized representation
> +   * Generates the gList(Group ID Mapping of Various frequent Features) Map from the corresponding serialized
> +   * representation
>    *
>    * @param params
>    * @param key
> @@ -98,16 +96,14 @@
>    * @return Deserialized Group List
>    * @throws IOException
>    */
> -  public static Map<String, Long> deserializeMap(Parameters params, String key,
> -      Configuration conf) throws IOException {
> +  public static Map<String, Long> deserializeMap(Parameters params, String key, Configuration conf)
> +      throws IOException {
>     Map<String, Long> map = new HashMap<String, Long>();
> -    conf
> -        .set(
> -            "io.serializations",
> -            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> +    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> +        + "org.apache.hadoop.io.serializer.WritableSerialization");
>
> -    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
> -        conf, GenericsUtil.getClass(map));
> +    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(conf,
> +        GenericsUtil.getClass(map));
>     String gListString = mapStringifier.toString(map);
>     gListString = params.get(key, gListString);
>     map = mapStringifier.fromString(gListString);
> @@ -115,33 +111,30 @@
>   }
>
>   /**
> -   * read the feature frequency List which is built at the end of the Parallel
> -   * counting job
> +   * read the feature frequency List which is built at the end of the Parallel counting job
>    *
>    * @param params
>    * @return Feature Frequency List
>    * @throws IOException
>    */
> -  public static List<Pair<String, Long>> readFList(Parameters params)
> -      throws IOException {
> +  public static List<Pair<String, Long>> readFList(Parameters params) throws IOException {
>     Writable key = new Text();
>     LongWritable value = new LongWritable();
>     int minSupport = Integer.valueOf(params.get("minSupport", "3"));
>     Configuration conf = new Configuration();
>
> -    FileSystem fs = FileSystem.get(new Path(params.get("output")
> -        + "/parallelcounting").toUri(), conf);
> -    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
> -        + "/parallelcounting/part-*"));
> +    FileSystem fs = FileSystem.get(new Path(params.get("output") + "/parallelcounting").toUri(), conf);
> +    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + "/parallelcounting/part-*"));
>
> -    PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(
> -        11, new Comparator<Pair<String, Long>>() {
> +    PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(11,
> +        new Comparator<Pair<String, Long>>() {
>
>           @Override
>           public int compare(Pair<String, Long> o1, Pair<String, Long> o2) {
>             int ret = o2.getSecond().compareTo(o1.getSecond());
> -            if (ret != 0)
> +            if (ret != 0) {
>               return ret;
> +            }
>             return o1.getFirst().compareTo(o2.getFirst());
>           }
>
> @@ -151,14 +144,16 @@
>       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
>       // key is feature value is count
>       while (reader.next(key, value)) {
> -        if (value.get() < minSupport)
> +        if (value.get() < minSupport) {
>           continue;
> +        }
>         queue.add(new Pair<String, Long>(key.toString(), value.get()));
>       }
>     }
>     List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
> -    while (queue.isEmpty() == false)
> +    while (queue.isEmpty() == false) {
>       fList.add(queue.poll());
> +    }
>     return fList;
>   }
>
> @@ -169,15 +164,13 @@
>    * @return List of TopK patterns for each string frequent feature
>    * @throws IOException
>    */
> -  public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(
> -      Parameters params) throws IOException {
> +  public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(Parameters params)
> +      throws IOException {
>
>     Configuration conf = new Configuration();
>
> -    FileSystem fs = FileSystem.get(new Path(params.get("output")
> -        + "/frequentPatterns").toUri(), conf);
> -    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
> -        + "/frequentPatterns/part-*"));
> +    FileSystem fs = FileSystem.get(new Path(params.get("output") + "/frequentPatterns").toUri(), conf);
> +    FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + "/frequentPatterns/part-*"));
>
>     List<Pair<String, TopKStringPatterns>> ret = new ArrayList<Pair<String, TopKStringPatterns>>();
>     for (FileStatus fileStatus : outputFiles) {
> @@ -189,42 +182,42 @@
>
>   /**
>    *
> -   * @param params params should contain input and output locations as a string
> -   *        value, the additional parameters include minSupport(3),
> -   *        maxHeapSize(50), numGroups(1000)
> +   * @param params params should contain input and output locations as a string value, the additional
> +   *        parameters include minSupport(3), maxHeapSize(50), numGroups(1000)
>    * @throws IOException
>    * @throws ClassNotFoundException
>    * @throws InterruptedException
>    */
> -  public static void runPFPGrowth(Parameters params) throws IOException,
> -      InterruptedException, ClassNotFoundException {
> +  public static void runPFPGrowth(Parameters params) throws IOException, InterruptedException,
> +      ClassNotFoundException {
>     startParallelCounting(params);
>     startGroupingItems(params);
> +    startTransactionSorting(params);
>     startParallelFPGrowth(params);
>     startAggregating(params);
>   }
>
>   /**
> -   * Run the aggregation Job to aggregate the different TopK patterns and group
> -   * each Pattern by the features present in it and thus calculate the final Top
> -   * K frequent Patterns for each feature
> +   * Run the aggregation Job to aggregate the different TopK patterns and group each Pattern by the features
> +   * present in it and thus calculate the final Top K frequent Patterns for each feature
>    *
>    * @param params
>    * @throws IOException
>    * @throws InterruptedException
>    * @throws ClassNotFoundException
>    */
> -  public static void startAggregating(Parameters params) throws IOException,
> -      InterruptedException, ClassNotFoundException {
> +  public static void startAggregating(Parameters params) throws IOException, InterruptedException,
> +      ClassNotFoundException {
>
>     Configuration conf = new Configuration();
>     params.set("fList", "");
>     params.set("gList", "");
>     conf.set("pfp.parameters", params.toString());
> +    conf.set("mapred.compress.map.output", "true");
> +    conf.set("mapred.output.compression.type", "BLOCK");
>
>     String input = params.get("output") + "/fpgrowth";
> -    Job job = new Job(conf, "PFP Aggregator Driver running over input: "
> -        + input);
> +    Job job = new Job(conf, "PFP Aggregator Driver running over input: " + input);
>     job.setJarByClass(PFPGrowth.class);
>
>     job.setOutputKeyClass(Text.class);
> @@ -248,8 +241,7 @@
>   }
>
>   /**
> -   * Group the given Features into g groups as defined by the numGroups
> -   * parameter in params
> +   * Group the given Features into g groups as defined by the numGroups parameter in params
>    *
>    * @param params
>    * @throws IOException
> @@ -261,14 +253,15 @@
>
>     Map<String, Long> gList = new HashMap<String, Long>();
>     long maxPerGroup = fList.size() / numGroups;
> -    if (fList.size() != maxPerGroup * numGroups)
> +    if (fList.size() != maxPerGroup * numGroups) {
>       maxPerGroup++;
> +    }
>
>     long i = 0;
>     long groupID = 0;
>     for (Pair<String, Long> featureFreq : fList) {
>       String feature = featureFreq.getFirst();
> -      if (i / (maxPerGroup) == groupID) {
> +      if (i / maxPerGroup == groupID) {
>         gList.put(feature, groupID);
>       } else {
>         groupID++;
> @@ -291,15 +284,17 @@
>    * @throws InterruptedException
>    * @throws ClassNotFoundException
>    */
> -  public static void startParallelCounting(Parameters params)
> -      throws IOException, InterruptedException, ClassNotFoundException {
> +  public static void startParallelCounting(Parameters params) throws IOException, InterruptedException,
> +      ClassNotFoundException {
>
>     Configuration conf = new Configuration();
>     conf.set("pfp.parameters", params.toString());
>
> +    conf.set("mapred.compress.map.output", "true");
> +    conf.set("mapred.output.compression.type", "BLOCK");
> +
>     String input = params.get("input");
> -    Job job = new Job(conf, "Parallel Counting Driver running over input: "
> -        + input);
> +    Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);
>     job.setJarByClass(PFPGrowth.class);
>
>     job.setOutputKeyClass(Text.class);
> @@ -325,26 +320,71 @@
>   }
>
>   /**
> -   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of
> -   * group dependent shards
> +   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
>    *
>    * @param params
>    * @throws IOException
>    * @throws InterruptedException
>    * @throws ClassNotFoundException
>    */
> -  public static void startParallelFPGrowth(Parameters params)
> -      throws IOException, InterruptedException, ClassNotFoundException {
> +  public static void startTransactionSorting(Parameters params) throws IOException, InterruptedException,
> +      ClassNotFoundException {
>
>     Configuration conf = new Configuration();
> +    String gList = params.get("gList");
> +    params.set("gList", "");
>     conf.set("pfp.parameters", params.toString());
> -
> +    conf.set("mapred.compress.map.output", "true");
> +    conf.set("mapred.output.compression.type", "BLOCK");
>     String input = params.get("input");
> +    Job job = new Job(conf, "PFP Transaction Sorting running over input" + input);
> +    job.setJarByClass(PFPGrowth.class);
> +
> +    job.setMapOutputKeyClass(LongWritable.class);
> +    job.setMapOutputValueClass(TransactionTree.class);
> +
> +    job.setOutputKeyClass(LongWritable.class);
> +    job.setOutputValueClass(TransactionTree.class);
> +
> +    FileInputFormat.addInputPath(job, new Path(input));
> +    Path outPath = new Path(params.get("output") + "/sortedoutput");
> +    FileOutputFormat.setOutputPath(job, outPath);
> +
> +    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
> +    if (dfs.exists(outPath)) {
> +      dfs.delete(outPath, true);
> +    }
> +
> +    job.setInputFormatClass(TextInputFormat.class);
> +    job.setMapperClass(TransactionSortingMapper.class);
> +    job.setReducerClass(TransactionSortingReducer.class);
> +    job.setOutputFormatClass(SequenceFileOutputFormat.class);
> +
> +    job.waitForCompletion(true);
> +    params.set("gList", gList);
> +  }
> +
> +  /**
> +   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
> +   *
> +   * @param params
> +   * @throws IOException
> +   * @throws InterruptedException
> +   * @throws ClassNotFoundException
> +   */
> +  public static void startParallelFPGrowth(Parameters params) throws IOException, InterruptedException,
> +      ClassNotFoundException {
> +
> +    Configuration conf = new Configuration();
> +    conf.set("pfp.parameters", params.toString());
> +    conf.set("mapred.compress.map.output", "true");
> +    conf.set("mapred.output.compression.type", "BLOCK");
> +    String input = params.get("output") + "/sortedoutput";
>     Job job = new Job(conf, "PFP Growth Driver running over input" + input);
>     job.setJarByClass(PFPGrowth.class);
>
>     job.setMapOutputKeyClass(LongWritable.class);
> -    job.setMapOutputValueClass(IntegerTuple.class);
> +    job.setMapOutputValueClass(TransactionTree.class);
>
>     job.setOutputKeyClass(Text.class);
>     job.setOutputValueClass(TopKStringPatterns.class);
> @@ -358,8 +398,9 @@
>       dfs.delete(outPath, true);
>     }
>
> -    job.setInputFormatClass(TextInputFormat.class);
> +    job.setInputFormatClass(SequenceFileInputFormat.class);
>     job.setMapperClass(ParallelFPGrowthMapper.class);
> +    job.setCombinerClass(ParallelFPGrowthCombiner.class);
>     job.setReducerClass(ParallelFPGrowthReducer.class);
>     job.setOutputFormatClass(SequenceFileOutputFormat.class);
>
> @@ -374,11 +415,9 @@
>    * @return Serialized String representation of List
>    * @throws IOException
>    */
> -  private static String serializeList(List<Pair<String, Long>> list,
> -      Configuration conf) throws IOException {
> -    conf.set(
> -            "io.serializations",
> -            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> +  private static String serializeList(List<Pair<String, Long>> list, Configuration conf) throws IOException {
> +    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> +        + "org.apache.hadoop.io.serializer.WritableSerialization");
>     DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
>         conf, GenericsUtil.getClass(list));
>     return listStringifier.toString(list);
> @@ -392,13 +431,11 @@
>    * @return Serialized String representation of the GList Map
>    * @throws IOException
>    */
> -  private static String serializeMap(Map<String, Long> map, Configuration conf)
> -      throws IOException {
> -    conf.set(
> -            "io.serializations",
> -            "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> -    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
> -        conf, GenericsUtil.getClass(map));
> +  private static String serializeMap(Map<String, Long> map, Configuration conf) throws IOException {
> +    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> +        + "org.apache.hadoop.io.serializer.WritableSerialization");
> +    DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(conf,
> +        GenericsUtil.getClass(map));
>     return mapStringifier.toString(map);
>   }
>  }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Thu Jan  7 16:45:37 2010
> @@ -1,5 +1,5 @@
>  /**
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> +w * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
> @@ -17,46 +17,44 @@
>
>  package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +import java.util.regex.Pattern;
> +
>  import org.apache.hadoop.io.LongWritable;
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.mapreduce.Mapper;
>  import org.apache.mahout.common.Parameters;
>
> -import java.io.IOException;
> -import java.util.regex.Pattern;
> -
>  /**
>  *
> - * {@link ParallelCountingMapper} maps all items in a particular transaction
> - * like the way it is done in Hadoop WordCount example
> + * {@link ParallelCountingMapper} maps all items in a particular transaction like the way it is done in Hadoop
> + * WordCount example
>  *
>  */
> -public class ParallelCountingMapper extends
> -    Mapper<LongWritable, Text, Text, LongWritable> {
> +public class ParallelCountingMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
>
>   private static final LongWritable one = new LongWritable(1);
>
>   private Pattern splitter = null;
>
>   @Override
> -  protected void map(LongWritable offset, Text input, Context context)
> -      throws IOException, InterruptedException {
> +  protected void map(LongWritable offset, Text input, Context context) throws IOException,
> +      InterruptedException {
>
>     String[] items = splitter.split(input.toString());
> -    for (String item : items){
> -      if(item.trim().length()==0) continue;
> -      context.setStatus("Parallel Counting Mapper: "+  item);
> +    for (String item : items) {
> +      if (item.trim().length() == 0) {
> +        continue;
> +      }
> +      context.setStatus("Parallel Counting Mapper: " + item);
>       context.write(new Text(item), one);
>     }
>   }
>
>   @Override
> -  protected void setup(Context context) throws IOException,
> -      InterruptedException {
> +  protected void setup(Context context) throws IOException, InterruptedException {
>     super.setup(context);
> -    Parameters params = Parameters.fromString(context.getConfiguration().get(
> -        "pfp.parameters", ""));
> -    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
> -        .toString()));
> +    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> +    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
>   }
>  }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java Thu Jan  7 16:45:37 2010
> @@ -17,23 +17,21 @@
>
>  package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +
>  import org.apache.hadoop.io.LongWritable;
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.mapreduce.Reducer;
>
> -import java.io.IOException;
> -
>  /**
> - * {@link ParallelCountingReducer} sums up the item count and output the item
> - * and the count This can also be used as a local Combiner. A simple summing
> - * reducer
> + * {@link ParallelCountingReducer} sums up the item count and output the item and the count This can also be
> + * used as a local Combiner. A simple summing reducer
>  */
> -public class ParallelCountingReducer extends
> -    Reducer<Text, LongWritable, Text, LongWritable> {
> +public class ParallelCountingReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
>
>   @Override
> -  protected void reduce(Text key, Iterable<LongWritable> values, Context context)
> -      throws IOException, InterruptedException {
> +  protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
> +      InterruptedException {
>     long sum = 0;
>     for (LongWritable value : values) {
>       context.setStatus("Parallel Counting Reducer :" + key);
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,55 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.mapreduce.Reducer;
> +import org.apache.mahout.common.Pair;
> +
> +/**
> + * {@link ParallelFPGrowthCombiner} takes each group of dependent transactions
> + * and\ compacts it in a TransactionTree structure
> + */
> +
> +public class ParallelFPGrowthCombiner extends
> +    Reducer<LongWritable, TransactionTree, LongWritable, TransactionTree> {
> +
> +  @Override
> +  protected void reduce(LongWritable key, Iterable<TransactionTree> values,
> +      Context context) throws IOException, InterruptedException {
> +    TransactionTree cTree = new TransactionTree();
> +    int count = 0;
> +    int node = 0;
> +    for (TransactionTree tr : values) {
> +      Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> +      while (it.hasNext()) {
> +        Pair<List<Integer>, Long> p = it.next();
> +        node += cTree.addPattern(p.getFirst(), p.getSecond());
> +        count++;
> +      }
> +    }
> +
> +    context.write(key, cTree.getCompressedTree());
> +
> +  }
> +
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Thu Jan  7 16:45:37 2010
> @@ -17,93 +17,73 @@
>
>  package org.apache.mahout.fpm.pfpgrowth;
>
> -import org.apache.hadoop.io.LongWritable;
> -import org.apache.hadoop.io.Text;
> -import org.apache.hadoop.mapreduce.Mapper;
> -import org.apache.mahout.common.IntegerTuple;
> -import org.apache.mahout.common.Pair;
> -import org.apache.mahout.common.Parameters;
>  import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.Collections;
>  import java.util.HashMap;
>  import java.util.HashSet;
> +import java.util.Iterator;
>  import java.util.List;
>  import java.util.Map;
> -import java.util.Set;
>  import java.util.Map.Entry;
> -import java.util.regex.Pattern;
> +import java.util.Set;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.mapreduce.Mapper;
> +import org.apache.mahout.common.Pair;
> +import org.apache.mahout.common.Parameters;
>
>  /**
> - * {@link ParallelFPGrowthMapper} maps each transaction to all unique items
> - * groups in the transaction. mapper outputs the group id as key and the
> - * transaction as value
> + * {@link ParallelFPGrowthMapper} maps each transaction to all unique items groups in the transaction. mapper
> + * outputs the group id as key and the transaction as value
>  *
>  */
>  public class ParallelFPGrowthMapper extends
> -    Mapper<LongWritable, Text, LongWritable, IntegerTuple> {
> -
> -  private final Map<String, Integer> fMap = new HashMap<String, Integer>();
> +    Mapper<LongWritable, TransactionTree, LongWritable, TransactionTree> {
>
>   private final Map<Integer, Long> gListInt = new HashMap<Integer, Long>();
>
> -  private Pattern splitter = null;
> -
>   @Override
> -  protected void map(LongWritable offset, Text input, Context context)
> -      throws IOException, InterruptedException {
> -
> -    String[] items = splitter.split(input.toString());
> -
> -    List<Integer> itemSet = new ArrayList<Integer>();
> -    for (String item : items) // remove items not in the fList
> -    {
> -      if (fMap.containsKey(item) && item.trim().length() != 0)
> -        itemSet.add(fMap.get(item));
> -    }
> -
> -    Collections.sort(itemSet);
> +  protected void map(LongWritable offset, TransactionTree input, Context context) throws IOException,
> +      InterruptedException {
>
> -    Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
> -
> -    Set<Long> groups = new HashSet<Long>();
> -    for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
> -                                                        // dependent
> -                                                        // shards
> -      Integer item = prunedItems[j];
> -      Long groupID = gListInt.get(item);
> -      if (groups.contains(groupID) == false) {
> -        Integer[] tempItems = new Integer[j + 1];
> -        System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> -        context
> -            .setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: "
> -                + item);
> -        context.write(new LongWritable(groupID), new IntegerTuple(tempItems));
> +    Iterator<Pair<List<Integer>, Long>> it = input.getIterator();
> +    while (it.hasNext()) {
> +      Pair<List<Integer>, Long> pattern = it.next();
> +      Integer[] prunedItems = pattern.getFirst().toArray(new Integer[pattern.getFirst().size()]);
> +
> +      Set<Long> groups = new HashSet<Long>();
> +      for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
> +        // dependent
> +        // shards
> +        Integer item = prunedItems[j];
> +        Long groupID = gListInt.get(item);
> +
> +        if (groups.contains(groupID) == false) {
> +          Integer[] tempItems = new Integer[j + 1];
> +          System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> +          context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item);
> +          context.write(new LongWritable(groupID), new TransactionTree(tempItems, pattern.getSecond()));
> +        }
> +        groups.add(groupID);
>       }
> -      groups.add(groupID);
>     }
>
>   }
>
>   @Override
> -  protected void setup(Context context) throws IOException,
> -      InterruptedException {
> +  protected void setup(Context context) throws IOException, InterruptedException {
>     super.setup(context);
> -    Parameters params = Parameters.fromString(context.getConfiguration().get(
> -        "pfp.parameters", ""));
> +    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
>
> +    Map<String, Integer> fMap = new HashMap<String, Integer>();
>     int i = 0;
> -    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
> -        context.getConfiguration())) {
> +    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
>       fMap.put(e.getFirst(), i++);
>     }
> -
> -    for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList",
> -        context.getConfiguration()).entrySet()) {
> +
> +    for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList", context.getConfiguration())
> +        .entrySet()) {
>       gListInt.put(fMap.get(e.getKey()), e.getValue());
>     }
> -    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
> -        .toString()));
>
>   }
>  }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Thu Jan  7 16:45:37 2010
> @@ -17,27 +17,30 @@
>
>  package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.Collections;
> +import java.util.Comparator;
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +
> +import org.apache.commons.lang.mutable.MutableLong;
>  import org.apache.hadoop.io.LongWritable;
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.mapreduce.Reducer;
> -import org.apache.mahout.common.IntegerTuple;
>  import org.apache.mahout.common.Pair;
>  import org.apache.mahout.common.Parameters;
> +import org.apache.mahout.fpm.pfpgrowth.convertors.ContextStatusUpdater;
>  import org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
>  import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
> -import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator;
>  import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
>  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
>  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
>
> -import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.HashMap;
> -import java.util.HashSet;
> -import java.util.List;
> -import java.util.Map;
> -import java.util.Map.Entry;
> -
>  /**
>  * {@link ParallelFPGrowthReducer} takes each group of transactions and runs
>  * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
> @@ -46,14 +49,16 @@
>  */
>
>  public class ParallelFPGrowthReducer extends
> -    Reducer<LongWritable, IntegerTuple, Text, TopKStringPatterns> {
> +    Reducer<LongWritable, TransactionTree, Text, TopKStringPatterns> {
>
>   private final List<Pair<Integer, Long>> fList = new ArrayList<Pair<Integer, Long>>();
> -
> +
>   private final List<String> featureReverseMap = new ArrayList<String>();
> -
> +
>   private final Map<String, Integer> fMap = new HashMap<String, Integer>();
>
> +  private final List<String> fRMap = new ArrayList<String>();
> +
>   private final Map<Long, List<Integer>> groupFeatures = new HashMap<Long, List<Integer>>();
>
>   private int maxHeapSize = 50;
> @@ -61,48 +66,81 @@
>   private int minSupport = 3;
>
>   @Override
> -  protected void reduce(LongWritable key, Iterable<IntegerTuple> values,
> +  protected void reduce(LongWritable key, Iterable<TransactionTree> values,
>       Context context) throws IOException {
> +    TransactionTree cTree = new TransactionTree();
> +    int nodes = 0;
> +    for (TransactionTree tr : values) {
> +      Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> +      while (it.hasNext()) {
> +        Pair<List<Integer>, Long> p = it.next();
> +        nodes += cTree.addPattern(p.getFirst(), p.getSecond());
> +      }
> +    }
> +
> +    List<Pair<Integer, Long>> localFList = new ArrayList<Pair<Integer, Long>>();
> +    for (Entry<Integer, MutableLong> fItem : cTree.generateFList().entrySet()) {
> +      localFList.add(new Pair<Integer, Long>(fItem.getKey(), fItem.getValue()
> +          .toLong()));
> +
> +    }
> +
> +    Collections.sort(localFList, new Comparator<Pair<Integer, Long>>() {
> +
> +      @Override
> +      public int compare(Pair<Integer, Long> o1, Pair<Integer, Long> o2) {
> +        int ret = o2.getSecond().compareTo(o1.getSecond());
> +        if (ret != 0) {
> +          return ret;
> +        }
> +        return o1.getFirst().compareTo(o2.getFirst());
> +      }
> +
> +    });
> +
> +
>     FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
>     fpGrowth
>         .generateTopKFrequentPatterns(
> -            new IntegerTupleIterator(values.iterator()),
> -            fList,
> +            cTree.getIterator(),
> +            localFList,
>             minSupport,
>             maxHeapSize,
>             new HashSet<Integer>(groupFeatures.get(key.get())),
>             new IntegerStringOutputConvertor(
> -                new ContextWriteOutputCollector<LongWritable, IntegerTuple, Text, TopKStringPatterns>(
> -                    context), featureReverseMap));
> +                new ContextWriteOutputCollector<LongWritable, TransactionTree, Text, TopKStringPatterns>(
> +                    context), featureReverseMap),
> +            new ContextStatusUpdater<LongWritable, TransactionTree, Text, TopKStringPatterns>(
> +                context));
>   }
>
>   @Override
> -  protected void setup(Context context) throws IOException, InterruptedException {
> +  protected void setup(Context context) throws IOException,
> +      InterruptedException {
>
>     super.setup(context);
>     Parameters params = Parameters.fromString(context.getConfiguration().get(
>         "pfp.parameters", ""));
> -
> -
> -
> +
>     int i = 0;
> -    for(Pair<String, Long> e: PFPGrowth.deserializeList(params, "fList", context
> -        .getConfiguration()))
> -    {
> +    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
> +        context.getConfiguration())) {
>       featureReverseMap.add(e.getFirst());
>       fMap.put(e.getFirst(), i);
> +      fRMap.add(e.getFirst());
>       fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
> +
>     }
> -
> +
>     Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList", context
>         .getConfiguration());
> -
> +
>     for (Entry<String, Long> entry : gList.entrySet()) {
>       List<Integer> groupList = groupFeatures.get(entry.getValue());
>       Integer itemInteger = fMap.get(entry.getKey());
> -      if (groupList != null)
> +      if (groupList != null) {
>         groupList.add(itemInteger);
> -      else {
> +      } else {
>         groupList = new ArrayList<Integer>();
>         groupList.add(itemInteger);
>         groupFeatures.put(entry.getValue(), groupList);
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,85 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Set;
> +import java.util.regex.Pattern;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.io.Text;
> +import org.apache.hadoop.mapreduce.Mapper;
> +import org.apache.mahout.common.Pair;
> +import org.apache.mahout.common.Parameters;
> +
> +/**
> + * {@link TransactionSortingMapper} maps each transaction to all unique items groups in the transaction.
> + * mapper outputs the group id as key and the transaction as value
> + *
> + */
> +public class TransactionSortingMapper extends Mapper<LongWritable, Text, LongWritable, TransactionTree> {
> +
> +  private final Map<String, Integer> fMap = new HashMap<String, Integer>();
> +
> +  private Pattern splitter = null;
> +
> +  @Override
> +  protected void map(LongWritable offset, Text input, Context context) throws IOException,
> +      InterruptedException {
> +
> +    String[] items = splitter.split(input.toString());
> +    Set<String> uniqueItems = new HashSet<String>(Arrays.asList(items));
> +
> +    List<Integer> itemSet = new ArrayList<Integer>();
> +    for (String item : uniqueItems) { // remove items not in the fList
> +      if (fMap.containsKey(item) && item.trim().length() != 0) {
> +        itemSet.add(fMap.get(item));
> +      }
> +    }
> +
> +    Collections.sort(itemSet);
> +
> +    Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
> +
> +    if (prunedItems.length > 0) {
> +      context.write(new LongWritable(prunedItems[0]), new TransactionTree(prunedItems, 1L));
> +    }
> +
> +  }
> +
> +  @Override
> +  protected void setup(Context context) throws IOException, InterruptedException {
> +    super.setup(context);
> +    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> +
> +    int i = 0;
> +    for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
> +      fMap.put(e.getFirst(), i++);
> +    }
> +
> +    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
> +
> +  }
> +}
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,42 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.IOException;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.mapreduce.Reducer;
> +/**
> + * {@link TransactionSortingReducer} takes each group of transactions and runs
> + * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
> + * group.
> + *
> + */
> +
> +public class TransactionSortingReducer extends
> +    Reducer<LongWritable, TransactionTree, LongWritable, TransactionTree> {
> +
> +  private static final LongWritable one = new LongWritable(1);
> +  @Override
> +  protected void reduce(LongWritable key, Iterable<TransactionTree> values,
> +      Context context) throws IOException, InterruptedException {
> +    for (TransactionTree tr : values) {
> +      context.write(one, tr);
> +    }
> +  }
> +}
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,461 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.DataInput;
> +import java.io.DataOutput;
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.Collections;
> +import java.util.Comparator;
> +import java.util.HashMap;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Stack;
> +
> +import org.apache.commons.lang.mutable.MutableLong;
> +import org.apache.hadoop.io.VIntWritable;
> +import org.apache.hadoop.io.VLongWritable;
> +import org.apache.hadoop.io.Writable;
> +import org.apache.mahout.common.Pair;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public final class TransactionTree implements Writable {
> +
> +  public final class TransactionTreeIterator implements Iterator<Pair<List<Integer>, Long>> {
> +
> +    Stack<int[]> depth = new Stack<int[]>();
> +
> +    public TransactionTreeIterator() {
> +      depth.push(new int[] {0, -1});
> +    }
> +
> +    @Override
> +    public boolean hasNext() {
> +      if (depth.isEmpty()) {
> +        return false;
> +      }
> +      return true;
> +    }
> +
> +    @Override
> +    public Pair<List<Integer>, Long> next() {
> +
> +      long sum = 0;
> +      int childId = 0;
> +      do {
> +        int[] top = depth.peek();
> +        while (top[1] + 1 == childCount[top[0]]) {
> +          depth.pop();
> +          top = depth.peek();
> +        }
> +        if (depth.isEmpty()) {
> +          return null;
> +        }
> +        top[1]++;
> +        childId = nodeChildren[top[0]][top[1]];
> +        depth.push(new int[] {childId, -1});
> +
> +        sum = 0;
> +        for (int i = childCount[childId] - 1; i >= 0; i--) {
> +          sum += nodeCount[nodeChildren[childId][i]];
> +        }
> +      } while (sum == nodeCount[childId]);
> +
> +      List<Integer> data = new ArrayList<Integer>();
> +      Iterator<int[]> it = depth.iterator();
> +      it.next();
> +      while (it.hasNext()) {
> +        data.add(attribute[it.next()[0]]);
> +      }
> +
> +      Pair<List<Integer>, Long> returnable = new Pair<List<Integer>, Long>(data, nodeCount[childId] - sum);
> +
> +      int[] top = depth.peek();
> +      while (top[1] + 1 == childCount[top[0]]) {
> +        depth.pop();
> +        if (depth.isEmpty()) {
> +          break;
> +        }
> +        top = depth.peek();
> +      }
> +      return returnable;
> +    }
> +
> +    @Override
> +    public void remove() {
> +      throw new UnsupportedOperationException();
> +    }
> +
> +  }
> +
> +  private static final int DEFAULT_CHILDREN_INITIAL_SIZE = 2;
> +
> +  private static final int DEFAULT_INITIAL_SIZE = 8;
> +
> +  private static final float GROWTH_RATE = 1.5f;
> +
> +  private static final Logger log = LoggerFactory.getLogger(TransactionTree.class);
> +
> +  private static final int ROOTNODEID = 0;
> +
> +  private int[] attribute;
> +
> +  private int[] childCount;
> +
> +  private int[][] nodeChildren;
> +
> +  private long[] nodeCount;
> +
> +  private int nodes = 0;
> +
> +  private boolean representedAsList = false;
> +
> +  private List<Pair<List<Integer>, Long>> transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> +
> +  public TransactionTree() {
> +    this(DEFAULT_INITIAL_SIZE);
> +    representedAsList = false;
> +  }
> +
> +  public TransactionTree(int size) {
> +    if (size < DEFAULT_INITIAL_SIZE) {
> +      size = DEFAULT_INITIAL_SIZE;
> +    }
> +    childCount = new int[size];
> +    attribute = new int[size];
> +    nodeCount = new long[size];
> +    nodeChildren = new int[size][];
> +    createRootNode();
> +    representedAsList = false;
> +  }
> +
> +  public TransactionTree(Integer[] items, Long support) {
> +    representedAsList = true;
> +    transactionSet.add(new Pair<List<Integer>, Long>(Arrays.asList(items), support));
> +  }
> +
> +  public TransactionTree(List<Pair<List<Integer>, Long>> transactionSet) {
> +    representedAsList = true;
> +    this.transactionSet = transactionSet;
> +  }
> +
> +  public void addChild(int parentNodeId, int childnodeId) {
> +    int length = childCount[parentNodeId];
> +    if (length >= nodeChildren[parentNodeId].length) {
> +      resizeChildren(parentNodeId);
> +    }
> +    nodeChildren[parentNodeId][length++] = childnodeId;
> +    childCount[parentNodeId] = length;
> +
> +  }
> +
> +  public boolean addCount(int nodeId, long nextNodeCount) {
> +    if (nodeId < nodes) {
> +      this.nodeCount[nodeId] += nextNodeCount;
> +      return true;
> +    }
> +    return false;
> +  }
> +
> +  public int addPattern(List<Integer> myList, long addCount) {
> +    int temp = TransactionTree.ROOTNODEID;
> +    int ret = 0;
> +    boolean addCountMode = true;
> +    int child = -1;
> +    for (int attributeValue : myList) {
> +
> +      if (addCountMode) {
> +        child = childWithAttribute(temp, attributeValue);
> +        if (child == -1) {
> +          addCountMode = false;
> +        } else {
> +          addCount(child, addCount);
> +          temp = child;
> +        }
> +      }
> +      if (!addCountMode) {
> +        child = createNode(temp, attributeValue, addCount);
> +        temp = child;
> +        ret++;
> +      }
> +    }
> +    return ret;
> +  }
> +
> +  public int attribute(int nodeId) {
> +    return this.attribute[nodeId];
> +  }
> +
> +  public int childAtIndex(int nodeId, int index) {
> +    if (childCount[nodeId] < index) {
> +      return -1;
> +    }
> +    return nodeChildren[nodeId][index];
> +  }
> +
> +  public int childCount() {
> +    int sum = 0;
> +    for (int i = 0; i < nodes; i++) {
> +      sum += childCount[i];
> +    }
> +    return sum;
> +  }
> +
> +  public int childCount(int nodeId) {
> +    return childCount[nodeId];
> +  }
> +
> +  public int childWithAttribute(int nodeId, int childAttribute) {
> +    int length = childCount[nodeId];
> +    for (int i = 0; i < length; i++) {
> +      if (attribute[nodeChildren[nodeId][i]] == childAttribute) {
> +        return nodeChildren[nodeId][i];
> +      }
> +    }
> +    return -1;
> +  }
> +
> +  public long count(int nodeId) {
> +    return nodeCount[nodeId];
> +  }
> +
> +  public Map<Integer, MutableLong> generateFList() {
> +    Map<Integer, MutableLong> frequencyList = new HashMap<Integer, MutableLong>();
> +    Iterator<Pair<List<Integer>, Long>> it = getIterator();
> +    int items = 0;
> +    int count = 0;
> +    while (it.hasNext()) {
> +      Pair<List<Integer>, Long> p = it.next();
> +      items += p.getFirst().size();
> +      count++;
> +      for (Integer i : p.getFirst()) {
> +        if (frequencyList.containsKey(i) == false) {
> +          frequencyList.put(i, new MutableLong(0));
> +        }
> +        frequencyList.get(i).add(p.getSecond());
> +      }
> +    }
> +    return frequencyList;
> +  }
> +
> +  public TransactionTree getCompressedTree() {
> +    TransactionTree ctree = new TransactionTree();
> +    Iterator<Pair<List<Integer>, Long>> it = getIterator();
> +    final Map<Integer, MutableLong> fList = generateFList();
> +    int node = 0;
> +    Comparator<Integer> comparator = new Comparator<Integer>() {
> +
> +      @Override
> +      public int compare(Integer o1, Integer o2) {
> +        return fList.get(o2).compareTo(fList.get(o1));
> +      }
> +
> +    };
> +    int size = 0;
> +    List<Pair<List<Integer>, Long>> compressedTransactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> +    while (it.hasNext()) {
> +      Pair<List<Integer>, Long> p = it.next();
> +      Collections.sort(p.getFirst(), comparator);
> +      compressedTransactionSet.add(p);
> +      node += ctree.addPattern(p.getFirst(), p.getSecond());
> +      size += p.getFirst().size() + 2;
> +    }
> +
> +    log.debug("Nodes in UnCompressed Tree: {} ", nodes);
> +    log
> +        .debug("UnCompressed Tree Size: {}", (this.nodes * 4 * 4 + this.childCount() * 4)
> +            / (double) 1000000);
> +    log.debug("Nodes in Compressed Tree: {} ", node);
> +    log.debug("Compressed Tree Size: {}", (node * 4 * 4 + ctree.childCount() * 4) / (double) 1000000);
> +    log.debug("TransactionSet Size: {}", (size * 4) / (double) 1000000);
> +    if (node * 4 * 4 + ctree.childCount() * 4 <= size * 4) {
> +      return ctree;
> +    } else {
> +      ctree = new TransactionTree(compressedTransactionSet);
> +      return ctree;
> +    }
> +  }
> +
> +  public Iterator<Pair<List<Integer>, Long>> getIterator() {
> +    if (this.isTreeEmpty() && !representedAsList) {
> +      throw new IllegalStateException("This is a bug. Please report this to mahout-user list");
> +    } else if (representedAsList) {
> +      return transactionSet.iterator();
> +    } else {
> +      return new TransactionTreeIterator();
> +    }
> +  }
> +
> +  public boolean isTreeEmpty() {
> +    return nodes <= 1;
> +  }
> +
> +  @Override
> +  public void readFields(DataInput in) throws IOException {
> +    representedAsList = in.readBoolean();
> +
> +    VIntWritable vInt = new VIntWritable();
> +    VLongWritable vLong = new VLongWritable();
> +
> +    if (representedAsList) {
> +      transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> +      vInt.readFields(in);
> +      int numTransactions = vInt.get();
> +      for (int i = 0; i < numTransactions; i++) {
> +        vLong.readFields(in);
> +        Long support = vLong.get();
> +
> +        vInt.readFields(in);
> +        int length = vInt.get();
> +
> +        Integer[] items = new Integer[length];
> +        for (int j = 0; j < length; j++) {
> +          vInt.readFields(in);
> +          items[j] = vInt.get();
> +        }
> +        Pair<List<Integer>, Long> transaction = new Pair<List<Integer>, Long>(Arrays.asList(items), support);
> +        transactionSet.add(transaction);
> +      }
> +    } else {
> +      vInt.readFields(in);
> +      nodes = vInt.get();
> +      attribute = new int[nodes];
> +      nodeCount = new long[nodes];
> +      childCount = new int[nodes];
> +      nodeChildren = new int[nodes][];
> +      for (int i = 0; i < nodes; i++) {
> +        vInt.readFields(in);
> +        attribute[i] = vInt.get();
> +        vLong.readFields(in);
> +        nodeCount[i] = vLong.get();
> +        vInt.readFields(in);
> +        childCount[i] = vInt.get();
> +        nodeChildren[i] = new int[childCount[i]];
> +        for (int j = 0, k = childCount[i]; j < k; j++) {
> +          vInt.readFields(in);
> +          nodeChildren[i][j] = vInt.get();
> +        }
> +      }
> +    }
> +  }
> +
> +  @Override
> +  public void write(DataOutput out) throws IOException {
> +    out.writeBoolean(representedAsList);
> +    VIntWritable vInt = new VIntWritable();
> +    VLongWritable vLong = new VLongWritable();
> +    if (representedAsList) {
> +      vInt.set(transactionSet.size());
> +      vInt.write(out);
> +      for (int i = 0, j = transactionSet.size(); i < j; i++) {
> +        Pair<List<Integer>, Long> transaction = transactionSet.get(i);
> +
> +        vLong.set(transaction.getSecond().longValue());
> +        vLong.write(out);
> +
> +        vInt.set(transaction.getFirst().size());
> +        vInt.write(out);
> +
> +        for (Integer item : transaction.getFirst()) {
> +          vInt.set(item);
> +          vInt.write(out);
> +        }
> +      }
> +    } else {
> +      vInt.set(nodes);
> +      vInt.write(out);
> +      for (int i = 0; i < nodes; i++) {
> +        vInt.set(attribute[i]);
> +        vInt.write(out);
> +        vLong.set(nodeCount[i]);
> +        vLong.write(out);
> +        vInt.set(childCount[i]);
> +        vInt.write(out);
> +        for (int j = 0, k = childCount[i]; j < k; j++) {
> +          vInt.set(nodeChildren[i][j]);
> +          vInt.write(out);
> +        }
> +      }
> +    }
> +  }
> +
> +  private int createNode(int parentNodeId, int attributeValue, long count) {
> +    if (nodes >= this.attribute.length) {
> +      resize();
> +    }
> +
> +    childCount[nodes] = 0;
> +    this.attribute[nodes] = attributeValue;
> +    nodeCount[nodes] = count;
> +    if (nodeChildren[nodes] == null) {
> +      nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> +    }
> +
> +    int childNodeId = nodes++;
> +    addChild(parentNodeId, childNodeId);
> +    return childNodeId;
> +  }
> +
> +  private int createRootNode() {
> +    childCount[nodes] = 0;
> +    attribute[nodes] = -1;
> +    nodeCount[nodes] = 0;
> +    if (nodeChildren[nodes] == null) {
> +      nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> +    }
> +    int childNodeId = nodes++;
> +    return childNodeId;
> +  }
> +
> +  private void resize() {
> +    int size = (int) (GROWTH_RATE * nodes);
> +    if (size < DEFAULT_INITIAL_SIZE) {
> +      size = DEFAULT_INITIAL_SIZE;
> +    }
> +
> +    int[] oldChildCount = childCount;
> +    int[] oldAttribute = attribute;
> +    long[] oldnodeCount = nodeCount;
> +    int[][] oldNodeChildren = nodeChildren;
> +
> +    childCount = new int[size];
> +    attribute = new int[size];
> +    nodeCount = new long[size];
> +    nodeChildren = new int[size][];
> +
> +    System.arraycopy(oldChildCount, 0, this.childCount, 0, nodes);
> +    System.arraycopy(oldAttribute, 0, this.attribute, 0, nodes);
> +    System.arraycopy(oldnodeCount, 0, this.nodeCount, 0, nodes);
> +    System.arraycopy(oldNodeChildren, 0, this.nodeChildren, 0, nodes);
> +  }
> +
> +  private void resizeChildren(int nodeId) {
> +    int length = childCount[nodeId];
> +    int size = (int) (GROWTH_RATE * length);
> +    if (size < DEFAULT_CHILDREN_INITIAL_SIZE) {
> +      size = DEFAULT_CHILDREN_INITIAL_SIZE;
> +    }
> +    int[] oldNodeChildren = nodeChildren[nodeId];
> +    nodeChildren[nodeId] = new int[size];
> +    System.arraycopy(oldNodeChildren, 0, this.nodeChildren[nodeId], 0, length);
> +  }
> +}
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,46 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth.convertors;
> +
> +import org.apache.hadoop.io.Writable;
> +import org.apache.hadoop.mapreduce.Reducer;
> +
> +public class ContextStatusUpdater<IK extends Writable, IV extends Writable,
> +    K extends Writable, V extends Writable> implements StatusUpdater {
> +
> +  private static final long PERIOD = 10000; // Update every 10 seconds
> +
> +  private final Reducer<IK, IV, K, V>.Context context;
> +
> +  private long time = System.currentTimeMillis();
> +
> +  public ContextStatusUpdater(Reducer<IK, IV, K, V>.Context context) {
> +    this.context = context;
> +  }
> +
> +  @Override
> +  public void update(String status) {
> +    long curTime = System.currentTimeMillis();
> +    if (curTime - time > PERIOD && context != null) {
> +      time = curTime;
> +      context.setStatus("Processing FPTree: " + status);
> +    }
> +
> +  }
> +
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java Thu Jan  7 16:45:37 2010
> @@ -25,8 +25,8 @@
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>
> -public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable, K extends Writable, V extends Writable>
> -    implements OutputCollector<K, V> {
> +public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable,
> +    K extends Writable, V extends Writable> implements OutputCollector<K, V> {
>
>   private static final Logger log = LoggerFactory
>       .getLogger(ContextWriteOutputCollector.class);
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java Thu Jan  7 16:45:37 2010
> @@ -0,0 +1,23 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth.convertors;
> +
> +public interface StatusUpdater {
> +
> +  void update(String status);
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java Thu Jan  7 16:45:37 2010
> @@ -19,40 +19,46 @@
>
>  import java.io.IOException;
>  import java.util.ArrayList;
> +import java.util.Collections;
>  import java.util.List;
>  import java.util.Map;
> +import java.util.PriorityQueue;
> +
>  import org.apache.hadoop.mapred.OutputCollector;
>  import org.apache.mahout.common.Pair;
>  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
>  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
>
> -public final class TopKPatternsOutputConvertor<A> implements
> +public final class TopKPatternsOutputConvertor<A extends Comparable<? super A>> implements
>     OutputCollector<Integer, FrequentPatternMaxHeap> {
>
>   private OutputCollector<A, List<Pair<List<A>, Long>>> collector = null;
>
>   private Map<Integer, A> reverseMapping = null;
>
> -  public TopKPatternsOutputConvertor(
> -      OutputCollector<A, List<Pair<List<A>, Long>>> collector,
> +  public TopKPatternsOutputConvertor(OutputCollector<A, List<Pair<List<A>, Long>>> collector,
>       Map<Integer, A> reverseMapping) {
>     this.collector = collector;
>     this.reverseMapping = reverseMapping;
>   }
>
>   @Override
> -  public void collect(Integer key, FrequentPatternMaxHeap value)
> -      throws IOException {
> +  public void collect(Integer key, FrequentPatternMaxHeap value) throws IOException {
>     List<Pair<List<A>, Long>> perAttributePatterns = new ArrayList<Pair<List<A>, Long>>();
> -    for (Pattern itemSet : value.getHeap()) {
> +    PriorityQueue<Pattern> t = value.getHeap();
> +    while (t.size() > 0) {
> +      Pattern itemSet = t.poll();
>       List<A> frequentPattern = new ArrayList<A>();
>       for (int j = 0; j < itemSet.length(); j++) {
>         frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
>       }
> -      Pair<List<A>, Long> returnItemSet = new Pair<List<A>, Long>(
> -          frequentPattern, itemSet.support());
> +      Collections.sort(frequentPattern);
> +
> +      Pair<List<A>, Long> returnItemSet = new Pair<List<A>, Long>(frequentPattern, itemSet.support());
>       perAttributePatterns.add(returnItemSet);
>     }
> +    Collections.reverse(perAttributePatterns);
> +
>     collector.collect(reverseMapping.get(key), perAttributePatterns);
>   }
>  }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java Thu Jan  7 16:45:37 2010
> @@ -21,14 +21,16 @@
>  import java.util.List;
>  import java.util.Map;
>
> -public class TransactionIterator<AP> implements Iterator<int[]> {
> +import org.apache.mahout.common.Pair;
> +
> +public class TransactionIterator<AP> implements Iterator<Pair<int[], Long>> {
>   private Map<AP, Integer> attributeIdMapping = null;
>
> -  private Iterator<List<AP>> iterator = null;
> +  private Iterator<Pair<List<AP>, Long>> iterator = null;
>
>   private int[] transactionBuffer = null;
>
> -  public TransactionIterator(Iterator<List<AP>> iterator,
> +  public TransactionIterator(Iterator<Pair<List<AP>, Long>> iterator,
>       Map<AP, Integer> attributeIdMapping) {
>     this.attributeIdMapping = attributeIdMapping;
>     this.iterator = iterator;
> @@ -41,18 +43,19 @@
>   }
>
>   @Override
> -  public final int[] next() {
> -    List<AP> transaction = iterator.next();
> +  public final Pair<int[], Long> next() {
> +    Pair<List<AP>, Long> transaction = iterator.next();
>     int index = 0;
> -    for (AP Attribute : transaction) {
> -      if (attributeIdMapping.containsKey(Attribute)) {
> -        transactionBuffer[index++] = attributeIdMapping.get(Attribute);
> +
> +    for (AP attribute : transaction.getFirst()) {
> +      if (attributeIdMapping.containsKey(attribute)) {
> +        transactionBuffer[index++] = attributeIdMapping.get(attribute);
>       }
>     }
>
>     int[] transactionList = new int[index];
>     System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
> -    return transactionList;
> +    return new Pair<int[], Long>(transactionList, transaction.getSecond());
>
>   }
>
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java Thu Jan  7 16:45:37 2010
> @@ -42,7 +42,7 @@
>   @Override
>   public void collect(Integer key, List<Pair<List<Integer>, Long>> value)
>       throws IOException {
> -    String StringKey = featureReverseMap.get(key);
> +    String stringKey = featureReverseMap.get(key);
>     List<Pair<List<String>, Long>> stringValues = new ArrayList<Pair<List<String>, Long>>();
>     for (Pair<List<Integer>, Long> e : value) {
>       List<String> pattern = new ArrayList<String>();
> @@ -53,7 +53,7 @@
>     }
>
>     collector
> -        .collect(new Text(StringKey), new TopKStringPatterns(stringValues));
> +        .collect(new Text(stringKey), new TopKStringPatterns(stringValues));
>   }
>
>  }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java Thu Jan  7 16:45:37 2010
> @@ -25,7 +25,7 @@
>  public final class IntegerTupleIterator implements Iterator<List<Integer>> {
>
>   private Iterator<IntegerTuple> iterator = null;
> -
> +
>   public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
>     this.iterator = iterator;
>   }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java Thu Jan  7 16:45:37 2010
> @@ -55,10 +55,12 @@
>     Pair<List<String>, Long> myItem = null;
>     Pair<List<String>, Long> otherItem = null;
>     for (int i = 0; i < heapSize; i++) {
> -      if (myItem == null && myIterator.hasNext())
> +      if (myItem == null && myIterator.hasNext()) {
>         myItem = myIterator.next();
> -      if (otherItem == null && otherIterator.hasNext())
> +      }
> +      if (otherItem == null && otherIterator.hasNext()) {
>         otherItem = otherIterator.next();
> +      }
>       if (myItem != null && otherItem != null) {
>         int cmp = myItem.getSecond().compareTo(otherItem.getSecond());
>         if (cmp == 0) {
> @@ -89,8 +91,9 @@
>       } else if (otherItem != null) {
>         patterns.add(otherItem);
>         otherItem = null;
> -      } else
> +      } else {
>         break;
> +      }
>     }
>     return new TopKStringPatterns(patterns);
>   }
>
>
>

Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ core/src/main/java/org/apache/mah

Posted by deneche abdelhakim <ad...@gmail.com>.
the build is successful, thanks =D

On Fri, Jan 8, 2010 at 9:23 AM, Robin Anil <ro...@gmail.com> wrote:
> Try Now
>

Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ core/src/main/java/org/apache/mah

Posted by Robin Anil <ro...@gmail.com>.
Try Now

Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ core/src/main/java/org/apache/mah

Posted by Robin Anil <ro...@gmail.com>.
Let me take a look. Seems reducer is not checked in.


On Fri, Jan 8, 2010 at 1:40 PM, deneche abdelhakim <ad...@gmail.com>wrote:

> Hi Robin,
>
> I'm getting a build failure with "mvn clean install":
>
> [INFO] Compilation failure
>
> /home/hakim/mahout-trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java:[89,24]
> cannot find symbol
> symbol  : class KeyBasedStringTupleReducer
> location: class
> org.apache.mahout.fpm.pfpgrowth.example.dataset.KeyBasedStringTupleGrouper
>
>
>
> On Thu, Jan 7, 2010 at 5:48 PM,  <ro...@apache.org> wrote:
> > Author: robinanil
> > Date: Thu Jan  7 16:45:37 2010
> > New Revision: 896922
> >
> > URL: http://svn.apache.org/viewvc?rev=896922&view=rev
> > Log:
> > MAHOUT-221 FP-Bonsai Pruning and Transaction Compaction
> >
> > Added:
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> >
>  lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/DeliciousTagsExample.java
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleCombiner.java
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleMapper.java
> > Modified:
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
> >
>  lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
> >
>  lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
> >
>  lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
> >
>  lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> Thu Jan  7 16:45:37 2010
> > @@ -22,31 +22,31 @@
> >  import java.util.List;
> >  import java.util.regex.Pattern;
> >
> > -public class StringRecordIterator implements Iterator<List<String>> {
> > -
> > +public class StringRecordIterator implements
> Iterator<Pair<List<String>,Long>> {
> > +
> >   private final Iterator<String> lineIterator;
> >   private Pattern splitter = null;
> > -
> > +
> >   public StringRecordIterator(FileLineIterable iterable, String pattern)
> {
> >     this.lineIterator = iterable.iterator();
> >     this.splitter = Pattern.compile(pattern);
> >   }
> > -
> > +
> >   @Override
> >   public boolean hasNext() {
> >     return lineIterator.hasNext();
> >   }
> > -
> > +
> >   @Override
> > -  public List<String> next() {
> > +  public Pair<List<String>,Long> next() {
> >     String line = lineIterator.next();
> >     String[] items = splitter.split(line);
> > -    return Arrays.asList(items);
> > +    return new Pair<List<String>,Long>(Arrays.asList(items),
> Long.valueOf(1));
> >   }
> > -
> > +
> >   @Override
> >   public void remove() {
> >     lineIterator.remove();
> >   }
> > -
> > -}
> > +
> > +}
> > \ No newline at end of file
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> Thu Jan  7 16:45:37 2010
> > @@ -17,34 +17,32 @@
> >
> >  package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.List;
> > +
> >  import org.apache.hadoop.io.Text;
> >  import org.apache.hadoop.mapreduce.Mapper;
> >  import org.apache.mahout.common.Pair;
> >  import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> >
> > -import java.io.IOException;
> > -import java.util.ArrayList;
> > -import java.util.List;
> > -
> >  /**
> >  *
> > - * {@link AggregatorMapper} outputs the pattern for each item in the
> pattern, so
> > - * that reducer can group them and select the top K frequent patterns
> > + * {@link AggregatorMapper} outputs the pattern for each item in the
> pattern, so that reducer can group them
> > + * and select the top K frequent patterns
> >  *
> >  */
> > -public class AggregatorMapper extends
> > -    Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
> > +public class AggregatorMapper extends Mapper<Text, TopKStringPatterns,
> Text, TopKStringPatterns> {
> >
> >   @Override
> > -  protected void map(Text key, TopKStringPatterns values, Context
> context)
> > -      throws IOException, InterruptedException {
> > +  protected void map(Text key, TopKStringPatterns values, Context
> context) throws IOException,
> > +      InterruptedException {
> >     for (Pair<List<String>, Long> pattern : values.getPatterns()) {
> >       for (String item : pattern.getFirst()) {
> >         List<Pair<List<String>, Long>> patternSingularList = new
> ArrayList<Pair<List<String>, Long>>();
> >         patternSingularList.add(pattern);
> > -        context.setStatus("Aggregator Mapper:Grouping Patterns for
> "+item);
> > -        context.write(new Text(item),
> > -            new TopKStringPatterns(patternSingularList));
> > +        context.setStatus("Aggregator Mapper:Grouping Patterns for " +
> item);
> > +        context.write(new Text(item), new
> TopKStringPatterns(patternSingularList));
> >       }
> >     }
> >
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> Thu Jan  7 16:45:37 2010
> > @@ -17,11 +17,12 @@
> >
> >  package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +
> >  import org.apache.hadoop.io.Text;
> >  import org.apache.hadoop.mapreduce.Reducer;
> >  import org.apache.mahout.common.Parameters;
> >  import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> > -import java.io.IOException;
> >
> >  /**
> >  *
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,70 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.util.Iterator;
> > +import java.util.List;
> > +
> > +import org.apache.mahout.common.Pair;
> > +
> > +public final class MultiTransactionTreeIterator implements
> Iterator<List<Integer>> {
> > +
> > +  private Iterator<Pair<List<Integer>, Long>> pIterator = null;
> > +
> > +  private Pair<List<Integer>, Long> currentPattern = null;
> > +
> > +  private long currentCount = 0;
> > +
> > +  public MultiTransactionTreeIterator(Iterator<Pair<List<Integer>,
> Long>> iterator) {
> > +    this.pIterator = iterator;
> > +
> > +    if (pIterator.hasNext()) {
> > +      currentPattern = pIterator.next();
> > +      currentCount = 0;
> > +    } else {
> > +      pIterator = null;
> > +    }
> > +
> > +  }
> > +
> > +  @Override
> > +  public boolean hasNext() {
> > +    return pIterator != null;
> > +  }
> > +
> > +  @Override
> > +  public List<Integer> next() {
> > +    List<Integer> returnable = currentPattern.getFirst();
> > +    currentCount++;
> > +    if (currentCount == currentPattern.getSecond().longValue()) {
> > +      if (pIterator.hasNext()) {
> > +        currentPattern = pIterator.next();
> > +        currentCount = 0;
> > +      } else {
> > +        pIterator = null;
> > +      }
> > +    }
> > +    return returnable;
> > +  }
> > +
> > +  @Override
> > +  public void remove() {
> > +    throw new UnsupportedOperationException();
> > +  }
> > +
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> Thu Jan  7 16:45:37 2010
> > @@ -42,7 +42,6 @@
> >  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> >  import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
> >  import org.apache.hadoop.util.GenericsUtil;
> > -import org.apache.mahout.common.IntegerTuple;
> >  import org.apache.mahout.common.Pair;
> >  import org.apache.mahout.common.Parameters;
> >  import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> > @@ -52,11 +51,11 @@
> >
> >  /**
> >  *
> > - * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as
> described in
> > - * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
> > + * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as
> described in the paper
> > + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
> >  *
> >  */
> > -public class PFPGrowth {
> > +public final class PFPGrowth {
> >   public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][
> ,\t]*");
> >
> >   private static final Logger log =
> LoggerFactory.getLogger(PFPGrowth.class);
> > @@ -73,12 +72,11 @@
> >    * @return Deserialized Feature Frequency List
> >    * @throws IOException
> >    */
> > -  public static List<Pair<String, Long>> deserializeList(Parameters
> params,
> > -      String key, Configuration conf) throws IOException {
> > +  public static List<Pair<String, Long>> deserializeList(Parameters
> params, String key, Configuration conf)
> > +      throws IOException {
> >     List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
> > -    conf.set(
> > -            "io.serializations",
> > -
>  "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > +    conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > +        + "org.apache.hadoop.io.serializer.WritableSerialization");
> >
> >     DefaultStringifier<List<Pair<String, Long>>> listStringifier = new
> DefaultStringifier<List<Pair<String, Long>>>(
> >         conf, GenericsUtil.getClass(list));
> > @@ -89,8 +87,8 @@
> >   }
> >
> >   /**
> > -   * Generates the gList(Group ID Mapping of Various frequent Features)
> Map from
> > -   * the corresponding serialized representation
> > +   * Generates the gList(Group ID Mapping of Various frequent Features)
> Map from the corresponding serialized
> > +   * representation
> >    *
> >    * @param params
> >    * @param key
> > @@ -98,16 +96,14 @@
> >    * @return Deserialized Group List
> >    * @throws IOException
> >    */
> > -  public static Map<String, Long> deserializeMap(Parameters params,
> String key,
> > -      Configuration conf) throws IOException {
> > +  public static Map<String, Long> deserializeMap(Parameters params,
> String key, Configuration conf)
> > +      throws IOException {
> >     Map<String, Long> map = new HashMap<String, Long>();
> > -    conf
> > -        .set(
> > -            "io.serializations",
> > -
>  "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > +    conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > +        + "org.apache.hadoop.io.serializer.WritableSerialization");
> >
> > -    DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(
> > -        conf, GenericsUtil.getClass(map));
> > +    DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(conf,
> > +        GenericsUtil.getClass(map));
> >     String gListString = mapStringifier.toString(map);
> >     gListString = params.get(key, gListString);
> >     map = mapStringifier.fromString(gListString);
> > @@ -115,33 +111,30 @@
> >   }
> >
> >   /**
> > -   * read the feature frequency List which is built at the end of the
> Parallel
> > -   * counting job
> > +   * read the feature frequency List which is built at the end of the
> Parallel counting job
> >    *
> >    * @param params
> >    * @return Feature Frequency List
> >    * @throws IOException
> >    */
> > -  public static List<Pair<String, Long>> readFList(Parameters params)
> > -      throws IOException {
> > +  public static List<Pair<String, Long>> readFList(Parameters params)
> throws IOException {
> >     Writable key = new Text();
> >     LongWritable value = new LongWritable();
> >     int minSupport = Integer.valueOf(params.get("minSupport", "3"));
> >     Configuration conf = new Configuration();
> >
> > -    FileSystem fs = FileSystem.get(new Path(params.get("output")
> > -        + "/parallelcounting").toUri(), conf);
> > -    FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output")
> > -        + "/parallelcounting/part-*"));
> > +    FileSystem fs = FileSystem.get(new Path(params.get("output") +
> "/parallelcounting").toUri(), conf);
> > +    FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output") + "/parallelcounting/part-*"));
> >
> > -    PriorityQueue<Pair<String, Long>> queue = new
> PriorityQueue<Pair<String, Long>>(
> > -        11, new Comparator<Pair<String, Long>>() {
> > +    PriorityQueue<Pair<String, Long>> queue = new
> PriorityQueue<Pair<String, Long>>(11,
> > +        new Comparator<Pair<String, Long>>() {
> >
> >           @Override
> >           public int compare(Pair<String, Long> o1, Pair<String, Long>
> o2) {
> >             int ret = o2.getSecond().compareTo(o1.getSecond());
> > -            if (ret != 0)
> > +            if (ret != 0) {
> >               return ret;
> > +            }
> >             return o1.getFirst().compareTo(o2.getFirst());
> >           }
> >
> > @@ -151,14 +144,16 @@
> >       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path,
> conf);
> >       // key is feature value is count
> >       while (reader.next(key, value)) {
> > -        if (value.get() < minSupport)
> > +        if (value.get() < minSupport) {
> >           continue;
> > +        }
> >         queue.add(new Pair<String, Long>(key.toString(), value.get()));
> >       }
> >     }
> >     List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
> > -    while (queue.isEmpty() == false)
> > +    while (queue.isEmpty() == false) {
> >       fList.add(queue.poll());
> > +    }
> >     return fList;
> >   }
> >
> > @@ -169,15 +164,13 @@
> >    * @return List of TopK patterns for each string frequent feature
> >    * @throws IOException
> >    */
> > -  public static List<Pair<String, TopKStringPatterns>>
> readFrequentPattern(
> > -      Parameters params) throws IOException {
> > +  public static List<Pair<String, TopKStringPatterns>>
> readFrequentPattern(Parameters params)
> > +      throws IOException {
> >
> >     Configuration conf = new Configuration();
> >
> > -    FileSystem fs = FileSystem.get(new Path(params.get("output")
> > -        + "/frequentPatterns").toUri(), conf);
> > -    FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output")
> > -        + "/frequentPatterns/part-*"));
> > +    FileSystem fs = FileSystem.get(new Path(params.get("output") +
> "/frequentPatterns").toUri(), conf);
> > +    FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output") + "/frequentPatterns/part-*"));
> >
> >     List<Pair<String, TopKStringPatterns>> ret = new
> ArrayList<Pair<String, TopKStringPatterns>>();
> >     for (FileStatus fileStatus : outputFiles) {
> > @@ -189,42 +182,42 @@
> >
> >   /**
> >    *
> > -   * @param params params should contain input and output locations as a
> string
> > -   *        value, the additional parameters include minSupport(3),
> > -   *        maxHeapSize(50), numGroups(1000)
> > +   * @param params params should contain input and output locations as a
> string value, the additional
> > +   *        parameters include minSupport(3), maxHeapSize(50),
> numGroups(1000)
> >    * @throws IOException
> >    * @throws ClassNotFoundException
> >    * @throws InterruptedException
> >    */
> > -  public static void runPFPGrowth(Parameters params) throws IOException,
> > -      InterruptedException, ClassNotFoundException {
> > +  public static void runPFPGrowth(Parameters params) throws IOException,
> InterruptedException,
> > +      ClassNotFoundException {
> >     startParallelCounting(params);
> >     startGroupingItems(params);
> > +    startTransactionSorting(params);
> >     startParallelFPGrowth(params);
> >     startAggregating(params);
> >   }
> >
> >   /**
> > -   * Run the aggregation Job to aggregate the different TopK patterns
> and group
> > -   * each Pattern by the features present in it and thus calculate the
> final Top
> > -   * K frequent Patterns for each feature
> > +   * Run the aggregation Job to aggregate the different TopK patterns
> and group each Pattern by the features
> > +   * present in it and thus calculate the final Top K frequent Patterns
> for each feature
> >    *
> >    * @param params
> >    * @throws IOException
> >    * @throws InterruptedException
> >    * @throws ClassNotFoundException
> >    */
> > -  public static void startAggregating(Parameters params) throws
> IOException,
> > -      InterruptedException, ClassNotFoundException {
> > +  public static void startAggregating(Parameters params) throws
> IOException, InterruptedException,
> > +      ClassNotFoundException {
> >
> >     Configuration conf = new Configuration();
> >     params.set("fList", "");
> >     params.set("gList", "");
> >     conf.set("pfp.parameters", params.toString());
> > +    conf.set("mapred.compress.map.output", "true");
> > +    conf.set("mapred.output.compression.type", "BLOCK");
> >
> >     String input = params.get("output") + "/fpgrowth";
> > -    Job job = new Job(conf, "PFP Aggregator Driver running over input: "
> > -        + input);
> > +    Job job = new Job(conf, "PFP Aggregator Driver running over input: "
> + input);
> >     job.setJarByClass(PFPGrowth.class);
> >
> >     job.setOutputKeyClass(Text.class);
> > @@ -248,8 +241,7 @@
> >   }
> >
> >   /**
> > -   * Group the given Features into g groups as defined by the numGroups
> > -   * parameter in params
> > +   * Group the given Features into g groups as defined by the numGroups
> parameter in params
> >    *
> >    * @param params
> >    * @throws IOException
> > @@ -261,14 +253,15 @@
> >
> >     Map<String, Long> gList = new HashMap<String, Long>();
> >     long maxPerGroup = fList.size() / numGroups;
> > -    if (fList.size() != maxPerGroup * numGroups)
> > +    if (fList.size() != maxPerGroup * numGroups) {
> >       maxPerGroup++;
> > +    }
> >
> >     long i = 0;
> >     long groupID = 0;
> >     for (Pair<String, Long> featureFreq : fList) {
> >       String feature = featureFreq.getFirst();
> > -      if (i / (maxPerGroup) == groupID) {
> > +      if (i / maxPerGroup == groupID) {
> >         gList.put(feature, groupID);
> >       } else {
> >         groupID++;
> > @@ -291,15 +284,17 @@
> >    * @throws InterruptedException
> >    * @throws ClassNotFoundException
> >    */
> > -  public static void startParallelCounting(Parameters params)
> > -      throws IOException, InterruptedException, ClassNotFoundException {
> > +  public static void startParallelCounting(Parameters params) throws
> IOException, InterruptedException,
> > +      ClassNotFoundException {
> >
> >     Configuration conf = new Configuration();
> >     conf.set("pfp.parameters", params.toString());
> >
> > +    conf.set("mapred.compress.map.output", "true");
> > +    conf.set("mapred.output.compression.type", "BLOCK");
> > +
> >     String input = params.get("input");
> > -    Job job = new Job(conf, "Parallel Counting Driver running over
> input: "
> > -        + input);
> > +    Job job = new Job(conf, "Parallel Counting Driver running over
> input: " + input);
> >     job.setJarByClass(PFPGrowth.class);
> >
> >     job.setOutputKeyClass(Text.class);
> > @@ -325,26 +320,71 @@
> >   }
> >
> >   /**
> > -   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K
> features of
> > -   * group dependent shards
> > +   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K
> features of group dependent shards
> >    *
> >    * @param params
> >    * @throws IOException
> >    * @throws InterruptedException
> >    * @throws ClassNotFoundException
> >    */
> > -  public static void startParallelFPGrowth(Parameters params)
> > -      throws IOException, InterruptedException, ClassNotFoundException {
> > +  public static void startTransactionSorting(Parameters params) throws
> IOException, InterruptedException,
> > +      ClassNotFoundException {
> >
> >     Configuration conf = new Configuration();
> > +    String gList = params.get("gList");
> > +    params.set("gList", "");
> >     conf.set("pfp.parameters", params.toString());
> > -
> > +    conf.set("mapred.compress.map.output", "true");
> > +    conf.set("mapred.output.compression.type", "BLOCK");
> >     String input = params.get("input");
> > +    Job job = new Job(conf, "PFP Transaction Sorting running over input"
> + input);
> > +    job.setJarByClass(PFPGrowth.class);
> > +
> > +    job.setMapOutputKeyClass(LongWritable.class);
> > +    job.setMapOutputValueClass(TransactionTree.class);
> > +
> > +    job.setOutputKeyClass(LongWritable.class);
> > +    job.setOutputValueClass(TransactionTree.class);
> > +
> > +    FileInputFormat.addInputPath(job, new Path(input));
> > +    Path outPath = new Path(params.get("output") + "/sortedoutput");
> > +    FileOutputFormat.setOutputPath(job, outPath);
> > +
> > +    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
> > +    if (dfs.exists(outPath)) {
> > +      dfs.delete(outPath, true);
> > +    }
> > +
> > +    job.setInputFormatClass(TextInputFormat.class);
> > +    job.setMapperClass(TransactionSortingMapper.class);
> > +    job.setReducerClass(TransactionSortingReducer.class);
> > +    job.setOutputFormatClass(SequenceFileOutputFormat.class);
> > +
> > +    job.waitForCompletion(true);
> > +    params.set("gList", gList);
> > +  }
> > +
> > +  /**
> > +   * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K
> features of group dependent shards
> > +   *
> > +   * @param params
> > +   * @throws IOException
> > +   * @throws InterruptedException
> > +   * @throws ClassNotFoundException
> > +   */
> > +  public static void startParallelFPGrowth(Parameters params) throws
> IOException, InterruptedException,
> > +      ClassNotFoundException {
> > +
> > +    Configuration conf = new Configuration();
> > +    conf.set("pfp.parameters", params.toString());
> > +    conf.set("mapred.compress.map.output", "true");
> > +    conf.set("mapred.output.compression.type", "BLOCK");
> > +    String input = params.get("output") + "/sortedoutput";
> >     Job job = new Job(conf, "PFP Growth Driver running over input" +
> input);
> >     job.setJarByClass(PFPGrowth.class);
> >
> >     job.setMapOutputKeyClass(LongWritable.class);
> > -    job.setMapOutputValueClass(IntegerTuple.class);
> > +    job.setMapOutputValueClass(TransactionTree.class);
> >
> >     job.setOutputKeyClass(Text.class);
> >     job.setOutputValueClass(TopKStringPatterns.class);
> > @@ -358,8 +398,9 @@
> >       dfs.delete(outPath, true);
> >     }
> >
> > -    job.setInputFormatClass(TextInputFormat.class);
> > +    job.setInputFormatClass(SequenceFileInputFormat.class);
> >     job.setMapperClass(ParallelFPGrowthMapper.class);
> > +    job.setCombinerClass(ParallelFPGrowthCombiner.class);
> >     job.setReducerClass(ParallelFPGrowthReducer.class);
> >     job.setOutputFormatClass(SequenceFileOutputFormat.class);
> >
> > @@ -374,11 +415,9 @@
> >    * @return Serialized String representation of List
> >    * @throws IOException
> >    */
> > -  private static String serializeList(List<Pair<String, Long>> list,
> > -      Configuration conf) throws IOException {
> > -    conf.set(
> > -            "io.serializations",
> > -
>  "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > +  private static String serializeList(List<Pair<String, Long>> list,
> Configuration conf) throws IOException {
> > +    conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > +        + "org.apache.hadoop.io.serializer.WritableSerialization");
> >     DefaultStringifier<List<Pair<String, Long>>> listStringifier = new
> DefaultStringifier<List<Pair<String, Long>>>(
> >         conf, GenericsUtil.getClass(list));
> >     return listStringifier.toString(list);
> > @@ -392,13 +431,11 @@
> >    * @return Serialized String representation of the GList Map
> >    * @throws IOException
> >    */
> > -  private static String serializeMap(Map<String, Long> map,
> Configuration conf)
> > -      throws IOException {
> > -    conf.set(
> > -            "io.serializations",
> > -
>  "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > -    DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(
> > -        conf, GenericsUtil.getClass(map));
> > +  private static String serializeMap(Map<String, Long> map,
> Configuration conf) throws IOException {
> > +    conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > +        + "org.apache.hadoop.io.serializer.WritableSerialization");
> > +    DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(conf,
> > +        GenericsUtil.getClass(map));
> >     return mapStringifier.toString(map);
> >   }
> >  }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> Thu Jan  7 16:45:37 2010
> > @@ -1,5 +1,5 @@
> >  /**
> > - * Licensed to the Apache Software Foundation (ASF) under one or more
> > +w * Licensed to the Apache Software Foundation (ASF) under one or more
> >  * contributor license agreements.  See the NOTICE file distributed with
> >  * this work for additional information regarding copyright ownership.
> >  * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > @@ -17,46 +17,44 @@
> >
> >  package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +import java.util.regex.Pattern;
> > +
> >  import org.apache.hadoop.io.LongWritable;
> >  import org.apache.hadoop.io.Text;
> >  import org.apache.hadoop.mapreduce.Mapper;
> >  import org.apache.mahout.common.Parameters;
> >
> > -import java.io.IOException;
> > -import java.util.regex.Pattern;
> > -
> >  /**
> >  *
> > - * {@link ParallelCountingMapper} maps all items in a particular
> transaction
> > - * like the way it is done in Hadoop WordCount example
> > + * {@link ParallelCountingMapper} maps all items in a particular
> transaction like the way it is done in Hadoop
> > + * WordCount example
> >  *
> >  */
> > -public class ParallelCountingMapper extends
> > -    Mapper<LongWritable, Text, Text, LongWritable> {
> > +public class ParallelCountingMapper extends Mapper<LongWritable, Text,
> Text, LongWritable> {
> >
> >   private static final LongWritable one = new LongWritable(1);
> >
> >   private Pattern splitter = null;
> >
> >   @Override
> > -  protected void map(LongWritable offset, Text input, Context context)
> > -      throws IOException, InterruptedException {
> > +  protected void map(LongWritable offset, Text input, Context context)
> throws IOException,
> > +      InterruptedException {
> >
> >     String[] items = splitter.split(input.toString());
> > -    for (String item : items){
> > -      if(item.trim().length()==0) continue;
> > -      context.setStatus("Parallel Counting Mapper: "+  item);
> > +    for (String item : items) {
> > +      if (item.trim().length() == 0) {
> > +        continue;
> > +      }
> > +      context.setStatus("Parallel Counting Mapper: " + item);
> >       context.write(new Text(item), one);
> >     }
> >   }
> >
> >   @Override
> > -  protected void setup(Context context) throws IOException,
> > -      InterruptedException {
> > +  protected void setup(Context context) throws IOException,
> InterruptedException {
> >     super.setup(context);
> > -    Parameters params =
> Parameters.fromString(context.getConfiguration().get(
> > -        "pfp.parameters", ""));
> > -    splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER
> > -        .toString()));
> > +    Parameters params =
> Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> > +    splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER.toString()));
> >   }
> >  }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> Thu Jan  7 16:45:37 2010
> > @@ -17,23 +17,21 @@
> >
> >  package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +
> >  import org.apache.hadoop.io.LongWritable;
> >  import org.apache.hadoop.io.Text;
> >  import org.apache.hadoop.mapreduce.Reducer;
> >
> > -import java.io.IOException;
> > -
> >  /**
> > - * {@link ParallelCountingReducer} sums up the item count and output the
> item
> > - * and the count This can also be used as a local Combiner. A simple
> summing
> > - * reducer
> > + * {@link ParallelCountingReducer} sums up the item count and output the
> item and the count This can also be
> > + * used as a local Combiner. A simple summing reducer
> >  */
> > -public class ParallelCountingReducer extends
> > -    Reducer<Text, LongWritable, Text, LongWritable> {
> > +public class ParallelCountingReducer extends Reducer<Text, LongWritable,
> Text, LongWritable> {
> >
> >   @Override
> > -  protected void reduce(Text key, Iterable<LongWritable> values, Context
> context)
> > -      throws IOException, InterruptedException {
> > +  protected void reduce(Text key, Iterable<LongWritable> values, Context
> context) throws IOException,
> > +      InterruptedException {
> >     long sum = 0;
> >     for (LongWritable value : values) {
> >       context.setStatus("Parallel Counting Reducer :" + key);
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,55 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.IOException;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.mapreduce.Reducer;
> > +import org.apache.mahout.common.Pair;
> > +
> > +/**
> > + * {@link ParallelFPGrowthCombiner} takes each group of dependent
> transactions
> > + * and\ compacts it in a TransactionTree structure
> > + */
> > +
> > +public class ParallelFPGrowthCombiner extends
> > +    Reducer<LongWritable, TransactionTree, LongWritable,
> TransactionTree> {
> > +
> > +  @Override
> > +  protected void reduce(LongWritable key, Iterable<TransactionTree>
> values,
> > +      Context context) throws IOException, InterruptedException {
> > +    TransactionTree cTree = new TransactionTree();
> > +    int count = 0;
> > +    int node = 0;
> > +    for (TransactionTree tr : values) {
> > +      Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> > +      while (it.hasNext()) {
> > +        Pair<List<Integer>, Long> p = it.next();
> > +        node += cTree.addPattern(p.getFirst(), p.getSecond());
> > +        count++;
> > +      }
> > +    }
> > +
> > +    context.write(key, cTree.getCompressedTree());
> > +
> > +  }
> > +
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> Thu Jan  7 16:45:37 2010
> > @@ -17,93 +17,73 @@
> >
> >  package org.apache.mahout.fpm.pfpgrowth;
> >
> > -import org.apache.hadoop.io.LongWritable;
> > -import org.apache.hadoop.io.Text;
> > -import org.apache.hadoop.mapreduce.Mapper;
> > -import org.apache.mahout.common.IntegerTuple;
> > -import org.apache.mahout.common.Pair;
> > -import org.apache.mahout.common.Parameters;
> >  import java.io.IOException;
> > -import java.util.ArrayList;
> > -import java.util.Collections;
> >  import java.util.HashMap;
> >  import java.util.HashSet;
> > +import java.util.Iterator;
> >  import java.util.List;
> >  import java.util.Map;
> > -import java.util.Set;
> >  import java.util.Map.Entry;
> > -import java.util.regex.Pattern;
> > +import java.util.Set;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.mapreduce.Mapper;
> > +import org.apache.mahout.common.Pair;
> > +import org.apache.mahout.common.Parameters;
> >
> >  /**
> > - * {@link ParallelFPGrowthMapper} maps each transaction to all unique
> items
> > - * groups in the transaction. mapper outputs the group id as key and the
> > - * transaction as value
> > + * {@link ParallelFPGrowthMapper} maps each transaction to all unique
> items groups in the transaction. mapper
> > + * outputs the group id as key and the transaction as value
> >  *
> >  */
> >  public class ParallelFPGrowthMapper extends
> > -    Mapper<LongWritable, Text, LongWritable, IntegerTuple> {
> > -
> > -  private final Map<String, Integer> fMap = new HashMap<String,
> Integer>();
> > +    Mapper<LongWritable, TransactionTree, LongWritable, TransactionTree>
> {
> >
> >   private final Map<Integer, Long> gListInt = new HashMap<Integer,
> Long>();
> >
> > -  private Pattern splitter = null;
> > -
> >   @Override
> > -  protected void map(LongWritable offset, Text input, Context context)
> > -      throws IOException, InterruptedException {
> > -
> > -    String[] items = splitter.split(input.toString());
> > -
> > -    List<Integer> itemSet = new ArrayList<Integer>();
> > -    for (String item : items) // remove items not in the fList
> > -    {
> > -      if (fMap.containsKey(item) && item.trim().length() != 0)
> > -        itemSet.add(fMap.get(item));
> > -    }
> > -
> > -    Collections.sort(itemSet);
> > +  protected void map(LongWritable offset, TransactionTree input, Context
> context) throws IOException,
> > +      InterruptedException {
> >
> > -    Integer[] prunedItems = itemSet.toArray(new
> Integer[itemSet.size()]);
> > -
> > -    Set<Long> groups = new HashSet<Long>();
> > -    for (int j = prunedItems.length - 1; j >= 0; j--) { // generate
> group
> > -                                                        // dependent
> > -                                                        // shards
> > -      Integer item = prunedItems[j];
> > -      Long groupID = gListInt.get(item);
> > -      if (groups.contains(groupID) == false) {
> > -        Integer[] tempItems = new Integer[j + 1];
> > -        System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> > -        context
> > -            .setStatus("Parallel FPGrowth: Generating Group Dependent
> transactions for: "
> > -                + item);
> > -        context.write(new LongWritable(groupID), new
> IntegerTuple(tempItems));
> > +    Iterator<Pair<List<Integer>, Long>> it = input.getIterator();
> > +    while (it.hasNext()) {
> > +      Pair<List<Integer>, Long> pattern = it.next();
> > +      Integer[] prunedItems = pattern.getFirst().toArray(new
> Integer[pattern.getFirst().size()]);
> > +
> > +      Set<Long> groups = new HashSet<Long>();
> > +      for (int j = prunedItems.length - 1; j >= 0; j--) { // generate
> group
> > +        // dependent
> > +        // shards
> > +        Integer item = prunedItems[j];
> > +        Long groupID = gListInt.get(item);
> > +
> > +        if (groups.contains(groupID) == false) {
> > +          Integer[] tempItems = new Integer[j + 1];
> > +          System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> > +          context.setStatus("Parallel FPGrowth: Generating Group
> Dependent transactions for: " + item);
> > +          context.write(new LongWritable(groupID), new
> TransactionTree(tempItems, pattern.getSecond()));
> > +        }
> > +        groups.add(groupID);
> >       }
> > -      groups.add(groupID);
> >     }
> >
> >   }
> >
> >   @Override
> > -  protected void setup(Context context) throws IOException,
> > -      InterruptedException {
> > +  protected void setup(Context context) throws IOException,
> InterruptedException {
> >     super.setup(context);
> > -    Parameters params =
> Parameters.fromString(context.getConfiguration().get(
> > -        "pfp.parameters", ""));
> > +    Parameters params =
> Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> >
> > +    Map<String, Integer> fMap = new HashMap<String, Integer>();
> >     int i = 0;
> > -    for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList",
> > -        context.getConfiguration())) {
> > +    for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList", context.getConfiguration())) {
> >       fMap.put(e.getFirst(), i++);
> >     }
> > -
> > -    for (Entry<String, Long> e : PFPGrowth.deserializeMap(params,
> "gList",
> > -        context.getConfiguration()).entrySet()) {
> > +
> > +    for (Entry<String, Long> e : PFPGrowth.deserializeMap(params,
> "gList", context.getConfiguration())
> > +        .entrySet()) {
> >       gListInt.put(fMap.get(e.getKey()), e.getValue());
> >     }
> > -    splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER
> > -        .toString()));
> >
> >   }
> >  }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> Thu Jan  7 16:45:37 2010
> > @@ -17,27 +17,30 @@
> >
> >  package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.Collections;
> > +import java.util.Comparator;
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Map.Entry;
> > +
> > +import org.apache.commons.lang.mutable.MutableLong;
> >  import org.apache.hadoop.io.LongWritable;
> >  import org.apache.hadoop.io.Text;
> >  import org.apache.hadoop.mapreduce.Reducer;
> > -import org.apache.mahout.common.IntegerTuple;
> >  import org.apache.mahout.common.Pair;
> >  import org.apache.mahout.common.Parameters;
> > +import org.apache.mahout.fpm.pfpgrowth.convertors.ContextStatusUpdater;
> >  import
> org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
> >  import
> org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
> > -import
> org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator;
> >  import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> >  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
> >  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
> >
> > -import java.io.IOException;
> > -import java.util.ArrayList;
> > -import java.util.HashMap;
> > -import java.util.HashSet;
> > -import java.util.List;
> > -import java.util.Map;
> > -import java.util.Map.Entry;
> > -
> >  /**
> >  * {@link ParallelFPGrowthReducer} takes each group of transactions and
> runs
> >  * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for
> each
> > @@ -46,14 +49,16 @@
> >  */
> >
> >  public class ParallelFPGrowthReducer extends
> > -    Reducer<LongWritable, IntegerTuple, Text, TopKStringPatterns> {
> > +    Reducer<LongWritable, TransactionTree, Text, TopKStringPatterns> {
> >
> >   private final List<Pair<Integer, Long>> fList = new
> ArrayList<Pair<Integer, Long>>();
> > -
> > +
> >   private final List<String> featureReverseMap = new ArrayList<String>();
> > -
> > +
> >   private final Map<String, Integer> fMap = new HashMap<String,
> Integer>();
> >
> > +  private final List<String> fRMap = new ArrayList<String>();
> > +
> >   private final Map<Long, List<Integer>> groupFeatures = new
> HashMap<Long, List<Integer>>();
> >
> >   private int maxHeapSize = 50;
> > @@ -61,48 +66,81 @@
> >   private int minSupport = 3;
> >
> >   @Override
> > -  protected void reduce(LongWritable key, Iterable<IntegerTuple> values,
> > +  protected void reduce(LongWritable key, Iterable<TransactionTree>
> values,
> >       Context context) throws IOException {
> > +    TransactionTree cTree = new TransactionTree();
> > +    int nodes = 0;
> > +    for (TransactionTree tr : values) {
> > +      Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> > +      while (it.hasNext()) {
> > +        Pair<List<Integer>, Long> p = it.next();
> > +        nodes += cTree.addPattern(p.getFirst(), p.getSecond());
> > +      }
> > +    }
> > +
> > +    List<Pair<Integer, Long>> localFList = new ArrayList<Pair<Integer,
> Long>>();
> > +    for (Entry<Integer, MutableLong> fItem :
> cTree.generateFList().entrySet()) {
> > +      localFList.add(new Pair<Integer, Long>(fItem.getKey(),
> fItem.getValue()
> > +          .toLong()));
> > +
> > +    }
> > +
> > +    Collections.sort(localFList, new Comparator<Pair<Integer, Long>>() {
> > +
> > +      @Override
> > +      public int compare(Pair<Integer, Long> o1, Pair<Integer, Long> o2)
> {
> > +        int ret = o2.getSecond().compareTo(o1.getSecond());
> > +        if (ret != 0) {
> > +          return ret;
> > +        }
> > +        return o1.getFirst().compareTo(o2.getFirst());
> > +      }
> > +
> > +    });
> > +
> > +
> >     FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
> >     fpGrowth
> >         .generateTopKFrequentPatterns(
> > -            new IntegerTupleIterator(values.iterator()),
> > -            fList,
> > +            cTree.getIterator(),
> > +            localFList,
> >             minSupport,
> >             maxHeapSize,
> >             new HashSet<Integer>(groupFeatures.get(key.get())),
> >             new IntegerStringOutputConvertor(
> > -                new ContextWriteOutputCollector<LongWritable,
> IntegerTuple, Text, TopKStringPatterns>(
> > -                    context), featureReverseMap));
> > +                new ContextWriteOutputCollector<LongWritable,
> TransactionTree, Text, TopKStringPatterns>(
> > +                    context), featureReverseMap),
> > +            new ContextStatusUpdater<LongWritable, TransactionTree,
> Text, TopKStringPatterns>(
> > +                context));
> >   }
> >
> >   @Override
> > -  protected void setup(Context context) throws IOException,
> InterruptedException {
> > +  protected void setup(Context context) throws IOException,
> > +      InterruptedException {
> >
> >     super.setup(context);
> >     Parameters params =
> Parameters.fromString(context.getConfiguration().get(
> >         "pfp.parameters", ""));
> > -
> > -
> > -
> > +
> >     int i = 0;
> > -    for(Pair<String, Long> e: PFPGrowth.deserializeList(params, "fList",
> context
> > -        .getConfiguration()))
> > -    {
> > +    for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList",
> > +        context.getConfiguration())) {
> >       featureReverseMap.add(e.getFirst());
> >       fMap.put(e.getFirst(), i);
> > +      fRMap.add(e.getFirst());
> >       fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
> > +
> >     }
> > -
> > +
> >     Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList",
> context
> >         .getConfiguration());
> > -
> > +
> >     for (Entry<String, Long> entry : gList.entrySet()) {
> >       List<Integer> groupList = groupFeatures.get(entry.getValue());
> >       Integer itemInteger = fMap.get(entry.getKey());
> > -      if (groupList != null)
> > +      if (groupList != null) {
> >         groupList.add(itemInteger);
> > -      else {
> > +      } else {
> >         groupList = new ArrayList<Integer>();
> >         groupList.add(itemInteger);
> >         groupFeatures.put(entry.getValue(), groupList);
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,85 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.Collections;
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Set;
> > +import java.util.regex.Pattern;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.io.Text;
> > +import org.apache.hadoop.mapreduce.Mapper;
> > +import org.apache.mahout.common.Pair;
> > +import org.apache.mahout.common.Parameters;
> > +
> > +/**
> > + * {@link TransactionSortingMapper} maps each transaction to all unique
> items groups in the transaction.
> > + * mapper outputs the group id as key and the transaction as value
> > + *
> > + */
> > +public class TransactionSortingMapper extends Mapper<LongWritable, Text,
> LongWritable, TransactionTree> {
> > +
> > +  private final Map<String, Integer> fMap = new HashMap<String,
> Integer>();
> > +
> > +  private Pattern splitter = null;
> > +
> > +  @Override
> > +  protected void map(LongWritable offset, Text input, Context context)
> throws IOException,
> > +      InterruptedException {
> > +
> > +    String[] items = splitter.split(input.toString());
> > +    Set<String> uniqueItems = new HashSet<String>(Arrays.asList(items));
> > +
> > +    List<Integer> itemSet = new ArrayList<Integer>();
> > +    for (String item : uniqueItems) { // remove items not in the fList
> > +      if (fMap.containsKey(item) && item.trim().length() != 0) {
> > +        itemSet.add(fMap.get(item));
> > +      }
> > +    }
> > +
> > +    Collections.sort(itemSet);
> > +
> > +    Integer[] prunedItems = itemSet.toArray(new
> Integer[itemSet.size()]);
> > +
> > +    if (prunedItems.length > 0) {
> > +      context.write(new LongWritable(prunedItems[0]), new
> TransactionTree(prunedItems, 1L));
> > +    }
> > +
> > +  }
> > +
> > +  @Override
> > +  protected void setup(Context context) throws IOException,
> InterruptedException {
> > +    super.setup(context);
> > +    Parameters params =
> Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> > +
> > +    int i = 0;
> > +    for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList", context.getConfiguration())) {
> > +      fMap.put(e.getFirst(), i++);
> > +    }
> > +
> > +    splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER.toString()));
> > +
> > +  }
> > +}
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,42 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.IOException;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.mapreduce.Reducer;
> > +/**
> > + * {@link TransactionSortingReducer} takes each group of transactions
> and runs
> > + * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns
> for each
> > + * group.
> > + *
> > + */
> > +
> > +public class TransactionSortingReducer extends
> > +    Reducer<LongWritable, TransactionTree, LongWritable,
> TransactionTree> {
> > +
> > +  private static final LongWritable one = new LongWritable(1);
> > +  @Override
> > +  protected void reduce(LongWritable key, Iterable<TransactionTree>
> values,
> > +      Context context) throws IOException, InterruptedException {
> > +    for (TransactionTree tr : values) {
> > +      context.write(one, tr);
> > +    }
> > +  }
> > +}
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,461 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.DataInput;
> > +import java.io.DataOutput;
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.Collections;
> > +import java.util.Comparator;
> > +import java.util.HashMap;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Stack;
> > +
> > +import org.apache.commons.lang.mutable.MutableLong;
> > +import org.apache.hadoop.io.VIntWritable;
> > +import org.apache.hadoop.io.VLongWritable;
> > +import org.apache.hadoop.io.Writable;
> > +import org.apache.mahout.common.Pair;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +public final class TransactionTree implements Writable {
> > +
> > +  public final class TransactionTreeIterator implements
> Iterator<Pair<List<Integer>, Long>> {
> > +
> > +    Stack<int[]> depth = new Stack<int[]>();
> > +
> > +    public TransactionTreeIterator() {
> > +      depth.push(new int[] {0, -1});
> > +    }
> > +
> > +    @Override
> > +    public boolean hasNext() {
> > +      if (depth.isEmpty()) {
> > +        return false;
> > +      }
> > +      return true;
> > +    }
> > +
> > +    @Override
> > +    public Pair<List<Integer>, Long> next() {
> > +
> > +      long sum = 0;
> > +      int childId = 0;
> > +      do {
> > +        int[] top = depth.peek();
> > +        while (top[1] + 1 == childCount[top[0]]) {
> > +          depth.pop();
> > +          top = depth.peek();
> > +        }
> > +        if (depth.isEmpty()) {
> > +          return null;
> > +        }
> > +        top[1]++;
> > +        childId = nodeChildren[top[0]][top[1]];
> > +        depth.push(new int[] {childId, -1});
> > +
> > +        sum = 0;
> > +        for (int i = childCount[childId] - 1; i >= 0; i--) {
> > +          sum += nodeCount[nodeChildren[childId][i]];
> > +        }
> > +      } while (sum == nodeCount[childId]);
> > +
> > +      List<Integer> data = new ArrayList<Integer>();
> > +      Iterator<int[]> it = depth.iterator();
> > +      it.next();
> > +      while (it.hasNext()) {
> > +        data.add(attribute[it.next()[0]]);
> > +      }
> > +
> > +      Pair<List<Integer>, Long> returnable = new Pair<List<Integer>,
> Long>(data, nodeCount[childId] - sum);
> > +
> > +      int[] top = depth.peek();
> > +      while (top[1] + 1 == childCount[top[0]]) {
> > +        depth.pop();
> > +        if (depth.isEmpty()) {
> > +          break;
> > +        }
> > +        top = depth.peek();
> > +      }
> > +      return returnable;
> > +    }
> > +
> > +    @Override
> > +    public void remove() {
> > +      throw new UnsupportedOperationException();
> > +    }
> > +
> > +  }
> > +
> > +  private static final int DEFAULT_CHILDREN_INITIAL_SIZE = 2;
> > +
> > +  private static final int DEFAULT_INITIAL_SIZE = 8;
> > +
> > +  private static final float GROWTH_RATE = 1.5f;
> > +
> > +  private static final Logger log =
> LoggerFactory.getLogger(TransactionTree.class);
> > +
> > +  private static final int ROOTNODEID = 0;
> > +
> > +  private int[] attribute;
> > +
> > +  private int[] childCount;
> > +
> > +  private int[][] nodeChildren;
> > +
> > +  private long[] nodeCount;
> > +
> > +  private int nodes = 0;
> > +
> > +  private boolean representedAsList = false;
> > +
> > +  private List<Pair<List<Integer>, Long>> transactionSet = new
> ArrayList<Pair<List<Integer>, Long>>();
> > +
> > +  public TransactionTree() {
> > +    this(DEFAULT_INITIAL_SIZE);
> > +    representedAsList = false;
> > +  }
> > +
> > +  public TransactionTree(int size) {
> > +    if (size < DEFAULT_INITIAL_SIZE) {
> > +      size = DEFAULT_INITIAL_SIZE;
> > +    }
> > +    childCount = new int[size];
> > +    attribute = new int[size];
> > +    nodeCount = new long[size];
> > +    nodeChildren = new int[size][];
> > +    createRootNode();
> > +    representedAsList = false;
> > +  }
> > +
> > +  public TransactionTree(Integer[] items, Long support) {
> > +    representedAsList = true;
> > +    transactionSet.add(new Pair<List<Integer>,
> Long>(Arrays.asList(items), support));
> > +  }
> > +
> > +  public TransactionTree(List<Pair<List<Integer>, Long>> transactionSet)
> {
> > +    representedAsList = true;
> > +    this.transactionSet = transactionSet;
> > +  }
> > +
> > +  public void addChild(int parentNodeId, int childnodeId) {
> > +    int length = childCount[parentNodeId];
> > +    if (length >= nodeChildren[parentNodeId].length) {
> > +      resizeChildren(parentNodeId);
> > +    }
> > +    nodeChildren[parentNodeId][length++] = childnodeId;
> > +    childCount[parentNodeId] = length;
> > +
> > +  }
> > +
> > +  public boolean addCount(int nodeId, long nextNodeCount) {
> > +    if (nodeId < nodes) {
> > +      this.nodeCount[nodeId] += nextNodeCount;
> > +      return true;
> > +    }
> > +    return false;
> > +  }
> > +
> > +  public int addPattern(List<Integer> myList, long addCount) {
> > +    int temp = TransactionTree.ROOTNODEID;
> > +    int ret = 0;
> > +    boolean addCountMode = true;
> > +    int child = -1;
> > +    for (int attributeValue : myList) {
> > +
> > +      if (addCountMode) {
> > +        child = childWithAttribute(temp, attributeValue);
> > +        if (child == -1) {
> > +          addCountMode = false;
> > +        } else {
> > +          addCount(child, addCount);
> > +          temp = child;
> > +        }
> > +      }
> > +      if (!addCountMode) {
> > +        child = createNode(temp, attributeValue, addCount);
> > +        temp = child;
> > +        ret++;
> > +      }
> > +    }
> > +    return ret;
> > +  }
> > +
> > +  public int attribute(int nodeId) {
> > +    return this.attribute[nodeId];
> > +  }
> > +
> > +  public int childAtIndex(int nodeId, int index) {
> > +    if (childCount[nodeId] < index) {
> > +      return -1;
> > +    }
> > +    return nodeChildren[nodeId][index];
> > +  }
> > +
> > +  public int childCount() {
> > +    int sum = 0;
> > +    for (int i = 0; i < nodes; i++) {
> > +      sum += childCount[i];
> > +    }
> > +    return sum;
> > +  }
> > +
> > +  public int childCount(int nodeId) {
> > +    return childCount[nodeId];
> > +  }
> > +
> > +  public int childWithAttribute(int nodeId, int childAttribute) {
> > +    int length = childCount[nodeId];
> > +    for (int i = 0; i < length; i++) {
> > +      if (attribute[nodeChildren[nodeId][i]] == childAttribute) {
> > +        return nodeChildren[nodeId][i];
> > +      }
> > +    }
> > +    return -1;
> > +  }
> > +
> > +  public long count(int nodeId) {
> > +    return nodeCount[nodeId];
> > +  }
> > +
> > +  public Map<Integer, MutableLong> generateFList() {
> > +    Map<Integer, MutableLong> frequencyList = new HashMap<Integer,
> MutableLong>();
> > +    Iterator<Pair<List<Integer>, Long>> it = getIterator();
> > +    int items = 0;
> > +    int count = 0;
> > +    while (it.hasNext()) {
> > +      Pair<List<Integer>, Long> p = it.next();
> > +      items += p.getFirst().size();
> > +      count++;
> > +      for (Integer i : p.getFirst()) {
> > +        if (frequencyList.containsKey(i) == false) {
> > +          frequencyList.put(i, new MutableLong(0));
> > +        }
> > +        frequencyList.get(i).add(p.getSecond());
> > +      }
> > +    }
> > +    return frequencyList;
> > +  }
> > +
> > +  public TransactionTree getCompressedTree() {
> > +    TransactionTree ctree = new TransactionTree();
> > +    Iterator<Pair<List<Integer>, Long>> it = getIterator();
> > +    final Map<Integer, MutableLong> fList = generateFList();
> > +    int node = 0;
> > +    Comparator<Integer> comparator = new Comparator<Integer>() {
> > +
> > +      @Override
> > +      public int compare(Integer o1, Integer o2) {
> > +        return fList.get(o2).compareTo(fList.get(o1));
> > +      }
> > +
> > +    };
> > +    int size = 0;
> > +    List<Pair<List<Integer>, Long>> compressedTransactionSet = new
> ArrayList<Pair<List<Integer>, Long>>();
> > +    while (it.hasNext()) {
> > +      Pair<List<Integer>, Long> p = it.next();
> > +      Collections.sort(p.getFirst(), comparator);
> > +      compressedTransactionSet.add(p);
> > +      node += ctree.addPattern(p.getFirst(), p.getSecond());
> > +      size += p.getFirst().size() + 2;
> > +    }
> > +
> > +    log.debug("Nodes in UnCompressed Tree: {} ", nodes);
> > +    log
> > +        .debug("UnCompressed Tree Size: {}", (this.nodes * 4 * 4 +
> this.childCount() * 4)
> > +            / (double) 1000000);
> > +    log.debug("Nodes in Compressed Tree: {} ", node);
> > +    log.debug("Compressed Tree Size: {}", (node * 4 * 4 +
> ctree.childCount() * 4) / (double) 1000000);
> > +    log.debug("TransactionSet Size: {}", (size * 4) / (double) 1000000);
> > +    if (node * 4 * 4 + ctree.childCount() * 4 <= size * 4) {
> > +      return ctree;
> > +    } else {
> > +      ctree = new TransactionTree(compressedTransactionSet);
> > +      return ctree;
> > +    }
> > +  }
> > +
> > +  public Iterator<Pair<List<Integer>, Long>> getIterator() {
> > +    if (this.isTreeEmpty() && !representedAsList) {
> > +      throw new IllegalStateException("This is a bug. Please report this
> to mahout-user list");
> > +    } else if (representedAsList) {
> > +      return transactionSet.iterator();
> > +    } else {
> > +      return new TransactionTreeIterator();
> > +    }
> > +  }
> > +
> > +  public boolean isTreeEmpty() {
> > +    return nodes <= 1;
> > +  }
> > +
> > +  @Override
> > +  public void readFields(DataInput in) throws IOException {
> > +    representedAsList = in.readBoolean();
> > +
> > +    VIntWritable vInt = new VIntWritable();
> > +    VLongWritable vLong = new VLongWritable();
> > +
> > +    if (representedAsList) {
> > +      transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> > +      vInt.readFields(in);
> > +      int numTransactions = vInt.get();
> > +      for (int i = 0; i < numTransactions; i++) {
> > +        vLong.readFields(in);
> > +        Long support = vLong.get();
> > +
> > +        vInt.readFields(in);
> > +        int length = vInt.get();
> > +
> > +        Integer[] items = new Integer[length];
> > +        for (int j = 0; j < length; j++) {
> > +          vInt.readFields(in);
> > +          items[j] = vInt.get();
> > +        }
> > +        Pair<List<Integer>, Long> transaction = new Pair<List<Integer>,
> Long>(Arrays.asList(items), support);
> > +        transactionSet.add(transaction);
> > +      }
> > +    } else {
> > +      vInt.readFields(in);
> > +      nodes = vInt.get();
> > +      attribute = new int[nodes];
> > +      nodeCount = new long[nodes];
> > +      childCount = new int[nodes];
> > +      nodeChildren = new int[nodes][];
> > +      for (int i = 0; i < nodes; i++) {
> > +        vInt.readFields(in);
> > +        attribute[i] = vInt.get();
> > +        vLong.readFields(in);
> > +        nodeCount[i] = vLong.get();
> > +        vInt.readFields(in);
> > +        childCount[i] = vInt.get();
> > +        nodeChildren[i] = new int[childCount[i]];
> > +        for (int j = 0, k = childCount[i]; j < k; j++) {
> > +          vInt.readFields(in);
> > +          nodeChildren[i][j] = vInt.get();
> > +        }
> > +      }
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void write(DataOutput out) throws IOException {
> > +    out.writeBoolean(representedAsList);
> > +    VIntWritable vInt = new VIntWritable();
> > +    VLongWritable vLong = new VLongWritable();
> > +    if (representedAsList) {
> > +      vInt.set(transactionSet.size());
> > +      vInt.write(out);
> > +      for (int i = 0, j = transactionSet.size(); i < j; i++) {
> > +        Pair<List<Integer>, Long> transaction = transactionSet.get(i);
> > +
> > +        vLong.set(transaction.getSecond().longValue());
> > +        vLong.write(out);
> > +
> > +        vInt.set(transaction.getFirst().size());
> > +        vInt.write(out);
> > +
> > +        for (Integer item : transaction.getFirst()) {
> > +          vInt.set(item);
> > +          vInt.write(out);
> > +        }
> > +      }
> > +    } else {
> > +      vInt.set(nodes);
> > +      vInt.write(out);
> > +      for (int i = 0; i < nodes; i++) {
> > +        vInt.set(attribute[i]);
> > +        vInt.write(out);
> > +        vLong.set(nodeCount[i]);
> > +        vLong.write(out);
> > +        vInt.set(childCount[i]);
> > +        vInt.write(out);
> > +        for (int j = 0, k = childCount[i]; j < k; j++) {
> > +          vInt.set(nodeChildren[i][j]);
> > +          vInt.write(out);
> > +        }
> > +      }
> > +    }
> > +  }
> > +
> > +  private int createNode(int parentNodeId, int attributeValue, long
> count) {
> > +    if (nodes >= this.attribute.length) {
> > +      resize();
> > +    }
> > +
> > +    childCount[nodes] = 0;
> > +    this.attribute[nodes] = attributeValue;
> > +    nodeCount[nodes] = count;
> > +    if (nodeChildren[nodes] == null) {
> > +      nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> > +    }
> > +
> > +    int childNodeId = nodes++;
> > +    addChild(parentNodeId, childNodeId);
> > +    return childNodeId;
> > +  }
> > +
> > +  private int createRootNode() {
> > +    childCount[nodes] = 0;
> > +    attribute[nodes] = -1;
> > +    nodeCount[nodes] = 0;
> > +    if (nodeChildren[nodes] == null) {
> > +      nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> > +    }
> > +    int childNodeId = nodes++;
> > +    return childNodeId;
> > +  }
> > +
> > +  private void resize() {
> > +    int size = (int) (GROWTH_RATE * nodes);
> > +    if (size < DEFAULT_INITIAL_SIZE) {
> > +      size = DEFAULT_INITIAL_SIZE;
> > +    }
> > +
> > +    int[] oldChildCount = childCount;
> > +    int[] oldAttribute = attribute;
> > +    long[] oldnodeCount = nodeCount;
> > +    int[][] oldNodeChildren = nodeChildren;
> > +
> > +    childCount = new int[size];
> > +    attribute = new int[size];
> > +    nodeCount = new long[size];
> > +    nodeChildren = new int[size][];
> > +
> > +    System.arraycopy(oldChildCount, 0, this.childCount, 0, nodes);
> > +    System.arraycopy(oldAttribute, 0, this.attribute, 0, nodes);
> > +    System.arraycopy(oldnodeCount, 0, this.nodeCount, 0, nodes);
> > +    System.arraycopy(oldNodeChildren, 0, this.nodeChildren, 0, nodes);
> > +  }
> > +
> > +  private void resizeChildren(int nodeId) {
> > +    int length = childCount[nodeId];
> > +    int size = (int) (GROWTH_RATE * length);
> > +    if (size < DEFAULT_CHILDREN_INITIAL_SIZE) {
> > +      size = DEFAULT_CHILDREN_INITIAL_SIZE;
> > +    }
> > +    int[] oldNodeChildren = nodeChildren[nodeId];
> > +    nodeChildren[nodeId] = new int[size];
> > +    System.arraycopy(oldNodeChildren, 0, this.nodeChildren[nodeId], 0,
> length);
> > +  }
> > +}
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,46 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth.convertors;
> > +
> > +import org.apache.hadoop.io.Writable;
> > +import org.apache.hadoop.mapreduce.Reducer;
> > +
> > +public class ContextStatusUpdater<IK extends Writable, IV extends
> Writable,
> > +    K extends Writable, V extends Writable> implements StatusUpdater {
> > +
> > +  private static final long PERIOD = 10000; // Update every 10 seconds
> > +
> > +  private final Reducer<IK, IV, K, V>.Context context;
> > +
> > +  private long time = System.currentTimeMillis();
> > +
> > +  public ContextStatusUpdater(Reducer<IK, IV, K, V>.Context context) {
> > +    this.context = context;
> > +  }
> > +
> > +  @Override
> > +  public void update(String status) {
> > +    long curTime = System.currentTimeMillis();
> > +    if (curTime - time > PERIOD && context != null) {
> > +      time = curTime;
> > +      context.setStatus("Processing FPTree: " + status);
> > +    }
> > +
> > +  }
> > +
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> Thu Jan  7 16:45:37 2010
> > @@ -25,8 +25,8 @@
> >  import org.slf4j.Logger;
> >  import org.slf4j.LoggerFactory;
> >
> > -public class ContextWriteOutputCollector<IK extends Writable, IV extends
> Writable, K extends Writable, V extends Writable>
> > -    implements OutputCollector<K, V> {
> > +public class ContextWriteOutputCollector<IK extends Writable, IV extends
> Writable,
> > +    K extends Writable, V extends Writable> implements
> OutputCollector<K, V> {
> >
> >   private static final Logger log = LoggerFactory
> >       .getLogger(ContextWriteOutputCollector.class);
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> Thu Jan  7 16:45:37 2010
> > @@ -0,0 +1,23 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth.convertors;
> > +
> > +public interface StatusUpdater {
> > +
> > +  void update(String status);
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> Thu Jan  7 16:45:37 2010
> > @@ -19,40 +19,46 @@
> >
> >  import java.io.IOException;
> >  import java.util.ArrayList;
> > +import java.util.Collections;
> >  import java.util.List;
> >  import java.util.Map;
> > +import java.util.PriorityQueue;
> > +
> >  import org.apache.hadoop.mapred.OutputCollector;
> >  import org.apache.mahout.common.Pair;
> >  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
> >  import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
> >
> > -public final class TopKPatternsOutputConvertor<A> implements
> > +public final class TopKPatternsOutputConvertor<A extends Comparable<?
> super A>> implements
> >     OutputCollector<Integer, FrequentPatternMaxHeap> {
> >
> >   private OutputCollector<A, List<Pair<List<A>, Long>>> collector = null;
> >
> >   private Map<Integer, A> reverseMapping = null;
> >
> > -  public TopKPatternsOutputConvertor(
> > -      OutputCollector<A, List<Pair<List<A>, Long>>> collector,
> > +  public TopKPatternsOutputConvertor(OutputCollector<A,
> List<Pair<List<A>, Long>>> collector,
> >       Map<Integer, A> reverseMapping) {
> >     this.collector = collector;
> >     this.reverseMapping = reverseMapping;
> >   }
> >
> >   @Override
> > -  public void collect(Integer key, FrequentPatternMaxHeap value)
> > -      throws IOException {
> > +  public void collect(Integer key, FrequentPatternMaxHeap value) throws
> IOException {
> >     List<Pair<List<A>, Long>> perAttributePatterns = new
> ArrayList<Pair<List<A>, Long>>();
> > -    for (Pattern itemSet : value.getHeap()) {
> > +    PriorityQueue<Pattern> t = value.getHeap();
> > +    while (t.size() > 0) {
> > +      Pattern itemSet = t.poll();
> >       List<A> frequentPattern = new ArrayList<A>();
> >       for (int j = 0; j < itemSet.length(); j++) {
> >         frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
> >       }
> > -      Pair<List<A>, Long> returnItemSet = new Pair<List<A>, Long>(
> > -          frequentPattern, itemSet.support());
> > +      Collections.sort(frequentPattern);
> > +
> > +      Pair<List<A>, Long> returnItemSet = new Pair<List<A>,
> Long>(frequentPattern, itemSet.support());
> >       perAttributePatterns.add(returnItemSet);
> >     }
> > +    Collections.reverse(perAttributePatterns);
> > +
> >     collector.collect(reverseMapping.get(key), perAttributePatterns);
> >   }
> >  }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> Thu Jan  7 16:45:37 2010
> > @@ -21,14 +21,16 @@
> >  import java.util.List;
> >  import java.util.Map;
> >
> > -public class TransactionIterator<AP> implements Iterator<int[]> {
> > +import org.apache.mahout.common.Pair;
> > +
> > +public class TransactionIterator<AP> implements Iterator<Pair<int[],
> Long>> {
> >   private Map<AP, Integer> attributeIdMapping = null;
> >
> > -  private Iterator<List<AP>> iterator = null;
> > +  private Iterator<Pair<List<AP>, Long>> iterator = null;
> >
> >   private int[] transactionBuffer = null;
> >
> > -  public TransactionIterator(Iterator<List<AP>> iterator,
> > +  public TransactionIterator(Iterator<Pair<List<AP>, Long>> iterator,
> >       Map<AP, Integer> attributeIdMapping) {
> >     this.attributeIdMapping = attributeIdMapping;
> >     this.iterator = iterator;
> > @@ -41,18 +43,19 @@
> >   }
> >
> >   @Override
> > -  public final int[] next() {
> > -    List<AP> transaction = iterator.next();
> > +  public final Pair<int[], Long> next() {
> > +    Pair<List<AP>, Long> transaction = iterator.next();
> >     int index = 0;
> > -    for (AP Attribute : transaction) {
> > -      if (attributeIdMapping.containsKey(Attribute)) {
> > -        transactionBuffer[index++] = attributeIdMapping.get(Attribute);
> > +
> > +    for (AP attribute : transaction.getFirst()) {
> > +      if (attributeIdMapping.containsKey(attribute)) {
> > +        transactionBuffer[index++] = attributeIdMapping.get(attribute);
> >       }
> >     }
> >
> >     int[] transactionList = new int[index];
> >     System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
> > -    return transactionList;
> > +    return new Pair<int[], Long>(transactionList,
> transaction.getSecond());
> >
> >   }
> >
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> Thu Jan  7 16:45:37 2010
> > @@ -42,7 +42,7 @@
> >   @Override
> >   public void collect(Integer key, List<Pair<List<Integer>, Long>> value)
> >       throws IOException {
> > -    String StringKey = featureReverseMap.get(key);
> > +    String stringKey = featureReverseMap.get(key);
> >     List<Pair<List<String>, Long>> stringValues = new
> ArrayList<Pair<List<String>, Long>>();
> >     for (Pair<List<Integer>, Long> e : value) {
> >       List<String> pattern = new ArrayList<String>();
> > @@ -53,7 +53,7 @@
> >     }
> >
> >     collector
> > -        .collect(new Text(StringKey), new
> TopKStringPatterns(stringValues));
> > +        .collect(new Text(stringKey), new
> TopKStringPatterns(stringValues));
> >   }
> >
> >  }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> Thu Jan  7 16:45:37 2010
> > @@ -25,7 +25,7 @@
> >  public final class IntegerTupleIterator implements
> Iterator<List<Integer>> {
> >
> >   private Iterator<IntegerTuple> iterator = null;
> > -
> > +
> >   public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
> >     this.iterator = iterator;
> >   }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> Thu Jan  7 16:45:37 2010
> > @@ -55,10 +55,12 @@
> >     Pair<List<String>, Long> myItem = null;
> >     Pair<List<String>, Long> otherItem = null;
> >     for (int i = 0; i < heapSize; i++) {
> > -      if (myItem == null && myIterator.hasNext())
> > +      if (myItem == null && myIterator.hasNext()) {
> >         myItem = myIterator.next();
> > -      if (otherItem == null && otherIterator.hasNext())
> > +      }
> > +      if (otherItem == null && otherIterator.hasNext()) {
> >         otherItem = otherIterator.next();
> > +      }
> >       if (myItem != null && otherItem != null) {
> >         int cmp = myItem.getSecond().compareTo(otherItem.getSecond());
> >         if (cmp == 0) {
> > @@ -89,8 +91,9 @@
> >       } else if (otherItem != null) {
> >         patterns.add(otherItem);
> >         otherItem = null;
> > -      } else
> > +      } else {
> >         break;
> > +      }
> >     }
> >     return new TopKStringPatterns(patterns);
> >   }
> >
> >
> >
>