You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by deneche abdelhakim <ad...@gmail.com> on 2010/01/08 09:10:26 UTC
Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/
core/src/main/java/org/apache/mah
Hi Robin,
I'm getting a build failure with "mvn clean install":
[INFO] Compilation failure
/home/hakim/mahout-trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java:[89,24]
cannot find symbol
symbol : class KeyBasedStringTupleReducer
location: class
org.apache.mahout.fpm.pfpgrowth.example.dataset.KeyBasedStringTupleGrouper
On Thu, Jan 7, 2010 at 5:48 PM, <ro...@apache.org> wrote:
> Author: robinanil
> Date: Thu Jan 7 16:45:37 2010
> New Revision: 896922
>
> URL: http://svn.apache.org/viewvc?rev=896922&view=rev
> Log:
> MAHOUT-221 FP-Bonsai Pruning and Transaction Compaction
>
> Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/DeliciousTagsExample.java
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleCombiner.java
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleMapper.java
> Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
> lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
> lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java Thu Jan 7 16:45:37 2010
> @@ -22,31 +22,31 @@
> import java.util.List;
> import java.util.regex.Pattern;
>
> -public class StringRecordIterator implements Iterator<List<String>> {
> -
> +public class StringRecordIterator implements Iterator<Pair<List<String>,Long>> {
> +
> private final Iterator<String> lineIterator;
> private Pattern splitter = null;
> -
> +
> public StringRecordIterator(FileLineIterable iterable, String pattern) {
> this.lineIterator = iterable.iterator();
> this.splitter = Pattern.compile(pattern);
> }
> -
> +
> @Override
> public boolean hasNext() {
> return lineIterator.hasNext();
> }
> -
> +
> @Override
> - public List<String> next() {
> + public Pair<List<String>,Long> next() {
> String line = lineIterator.next();
> String[] items = splitter.split(line);
> - return Arrays.asList(items);
> + return new Pair<List<String>,Long>(Arrays.asList(items), Long.valueOf(1));
> }
> -
> +
> @Override
> public void remove() {
> lineIterator.remove();
> }
> -
> -}
> +
> +}
> \ No newline at end of file
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java Thu Jan 7 16:45:37 2010
> @@ -17,34 +17,32 @@
>
> package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.mahout.common.Pair;
> import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
>
> -import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.List;
> -
> /**
> *
> - * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so
> - * that reducer can group them and select the top K frequent patterns
> + * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so that reducer can group them
> + * and select the top K frequent patterns
> *
> */
> -public class AggregatorMapper extends
> - Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
> +public class AggregatorMapper extends Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
>
> @Override
> - protected void map(Text key, TopKStringPatterns values, Context context)
> - throws IOException, InterruptedException {
> + protected void map(Text key, TopKStringPatterns values, Context context) throws IOException,
> + InterruptedException {
> for (Pair<List<String>, Long> pattern : values.getPatterns()) {
> for (String item : pattern.getFirst()) {
> List<Pair<List<String>, Long>> patternSingularList = new ArrayList<Pair<List<String>, Long>>();
> patternSingularList.add(pattern);
> - context.setStatus("Aggregator Mapper:Grouping Patterns for "+item);
> - context.write(new Text(item),
> - new TopKStringPatterns(patternSingularList));
> + context.setStatus("Aggregator Mapper:Grouping Patterns for " + item);
> + context.write(new Text(item), new TopKStringPatterns(patternSingularList));
> }
> }
>
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java Thu Jan 7 16:45:37 2010
> @@ -17,11 +17,12 @@
>
> package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Reducer;
> import org.apache.mahout.common.Parameters;
> import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> -import java.io.IOException;
>
> /**
> *
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,70 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.mahout.common.Pair;
> +
> +public final class MultiTransactionTreeIterator implements Iterator<List<Integer>> {
> +
> + private Iterator<Pair<List<Integer>, Long>> pIterator = null;
> +
> + private Pair<List<Integer>, Long> currentPattern = null;
> +
> + private long currentCount = 0;
> +
> + public MultiTransactionTreeIterator(Iterator<Pair<List<Integer>, Long>> iterator) {
> + this.pIterator = iterator;
> +
> + if (pIterator.hasNext()) {
> + currentPattern = pIterator.next();
> + currentCount = 0;
> + } else {
> + pIterator = null;
> + }
> +
> + }
> +
> + @Override
> + public boolean hasNext() {
> + return pIterator != null;
> + }
> +
> + @Override
> + public List<Integer> next() {
> + List<Integer> returnable = currentPattern.getFirst();
> + currentCount++;
> + if (currentCount == currentPattern.getSecond().longValue()) {
> + if (pIterator.hasNext()) {
> + currentPattern = pIterator.next();
> + currentCount = 0;
> + } else {
> + pIterator = null;
> + }
> + }
> + return returnable;
> + }
> +
> + @Override
> + public void remove() {
> + throw new UnsupportedOperationException();
> + }
> +
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Thu Jan 7 16:45:37 2010
> @@ -42,7 +42,6 @@
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
> import org.apache.hadoop.util.GenericsUtil;
> -import org.apache.mahout.common.IntegerTuple;
> import org.apache.mahout.common.Pair;
> import org.apache.mahout.common.Parameters;
> import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> @@ -52,11 +51,11 @@
>
> /**
> *
> - * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in
> - * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
> + * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in the paper
> + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
> *
> */
> -public class PFPGrowth {
> +public final class PFPGrowth {
> public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
>
> private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
> @@ -73,12 +72,11 @@
> * @return Deserialized Feature Frequency List
> * @throws IOException
> */
> - public static List<Pair<String, Long>> deserializeList(Parameters params,
> - String key, Configuration conf) throws IOException {
> + public static List<Pair<String, Long>> deserializeList(Parameters params, String key, Configuration conf)
> + throws IOException {
> List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
> - conf.set(
> - "io.serializations",
> - "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> + conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> + + "org.apache.hadoop.io.serializer.WritableSerialization");
>
> DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
> conf, GenericsUtil.getClass(list));
> @@ -89,8 +87,8 @@
> }
>
> /**
> - * Generates the gList(Group ID Mapping of Various frequent Features) Map from
> - * the corresponding serialized representation
> + * Generates the gList(Group ID Mapping of Various frequent Features) Map from the corresponding serialized
> + * representation
> *
> * @param params
> * @param key
> @@ -98,16 +96,14 @@
> * @return Deserialized Group List
> * @throws IOException
> */
> - public static Map<String, Long> deserializeMap(Parameters params, String key,
> - Configuration conf) throws IOException {
> + public static Map<String, Long> deserializeMap(Parameters params, String key, Configuration conf)
> + throws IOException {
> Map<String, Long> map = new HashMap<String, Long>();
> - conf
> - .set(
> - "io.serializations",
> - "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> + conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> + + "org.apache.hadoop.io.serializer.WritableSerialization");
>
> - DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
> - conf, GenericsUtil.getClass(map));
> + DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(conf,
> + GenericsUtil.getClass(map));
> String gListString = mapStringifier.toString(map);
> gListString = params.get(key, gListString);
> map = mapStringifier.fromString(gListString);
> @@ -115,33 +111,30 @@
> }
>
> /**
> - * read the feature frequency List which is built at the end of the Parallel
> - * counting job
> + * read the feature frequency List which is built at the end of the Parallel counting job
> *
> * @param params
> * @return Feature Frequency List
> * @throws IOException
> */
> - public static List<Pair<String, Long>> readFList(Parameters params)
> - throws IOException {
> + public static List<Pair<String, Long>> readFList(Parameters params) throws IOException {
> Writable key = new Text();
> LongWritable value = new LongWritable();
> int minSupport = Integer.valueOf(params.get("minSupport", "3"));
> Configuration conf = new Configuration();
>
> - FileSystem fs = FileSystem.get(new Path(params.get("output")
> - + "/parallelcounting").toUri(), conf);
> - FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
> - + "/parallelcounting/part-*"));
> + FileSystem fs = FileSystem.get(new Path(params.get("output") + "/parallelcounting").toUri(), conf);
> + FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + "/parallelcounting/part-*"));
>
> - PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(
> - 11, new Comparator<Pair<String, Long>>() {
> + PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(11,
> + new Comparator<Pair<String, Long>>() {
>
> @Override
> public int compare(Pair<String, Long> o1, Pair<String, Long> o2) {
> int ret = o2.getSecond().compareTo(o1.getSecond());
> - if (ret != 0)
> + if (ret != 0) {
> return ret;
> + }
> return o1.getFirst().compareTo(o2.getFirst());
> }
>
> @@ -151,14 +144,16 @@
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
> // key is feature value is count
> while (reader.next(key, value)) {
> - if (value.get() < minSupport)
> + if (value.get() < minSupport) {
> continue;
> + }
> queue.add(new Pair<String, Long>(key.toString(), value.get()));
> }
> }
> List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
> - while (queue.isEmpty() == false)
> + while (queue.isEmpty() == false) {
> fList.add(queue.poll());
> + }
> return fList;
> }
>
> @@ -169,15 +164,13 @@
> * @return List of TopK patterns for each string frequent feature
> * @throws IOException
> */
> - public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(
> - Parameters params) throws IOException {
> + public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(Parameters params)
> + throws IOException {
>
> Configuration conf = new Configuration();
>
> - FileSystem fs = FileSystem.get(new Path(params.get("output")
> - + "/frequentPatterns").toUri(), conf);
> - FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
> - + "/frequentPatterns/part-*"));
> + FileSystem fs = FileSystem.get(new Path(params.get("output") + "/frequentPatterns").toUri(), conf);
> + FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output") + "/frequentPatterns/part-*"));
>
> List<Pair<String, TopKStringPatterns>> ret = new ArrayList<Pair<String, TopKStringPatterns>>();
> for (FileStatus fileStatus : outputFiles) {
> @@ -189,42 +182,42 @@
>
> /**
> *
> - * @param params params should contain input and output locations as a string
> - * value, the additional parameters include minSupport(3),
> - * maxHeapSize(50), numGroups(1000)
> + * @param params params should contain input and output locations as a string value, the additional
> + * parameters include minSupport(3), maxHeapSize(50), numGroups(1000)
> * @throws IOException
> * @throws ClassNotFoundException
> * @throws InterruptedException
> */
> - public static void runPFPGrowth(Parameters params) throws IOException,
> - InterruptedException, ClassNotFoundException {
> + public static void runPFPGrowth(Parameters params) throws IOException, InterruptedException,
> + ClassNotFoundException {
> startParallelCounting(params);
> startGroupingItems(params);
> + startTransactionSorting(params);
> startParallelFPGrowth(params);
> startAggregating(params);
> }
>
> /**
> - * Run the aggregation Job to aggregate the different TopK patterns and group
> - * each Pattern by the features present in it and thus calculate the final Top
> - * K frequent Patterns for each feature
> + * Run the aggregation Job to aggregate the different TopK patterns and group each Pattern by the features
> + * present in it and thus calculate the final Top K frequent Patterns for each feature
> *
> * @param params
> * @throws IOException
> * @throws InterruptedException
> * @throws ClassNotFoundException
> */
> - public static void startAggregating(Parameters params) throws IOException,
> - InterruptedException, ClassNotFoundException {
> + public static void startAggregating(Parameters params) throws IOException, InterruptedException,
> + ClassNotFoundException {
>
> Configuration conf = new Configuration();
> params.set("fList", "");
> params.set("gList", "");
> conf.set("pfp.parameters", params.toString());
> + conf.set("mapred.compress.map.output", "true");
> + conf.set("mapred.output.compression.type", "BLOCK");
>
> String input = params.get("output") + "/fpgrowth";
> - Job job = new Job(conf, "PFP Aggregator Driver running over input: "
> - + input);
> + Job job = new Job(conf, "PFP Aggregator Driver running over input: " + input);
> job.setJarByClass(PFPGrowth.class);
>
> job.setOutputKeyClass(Text.class);
> @@ -248,8 +241,7 @@
> }
>
> /**
> - * Group the given Features into g groups as defined by the numGroups
> - * parameter in params
> + * Group the given Features into g groups as defined by the numGroups parameter in params
> *
> * @param params
> * @throws IOException
> @@ -261,14 +253,15 @@
>
> Map<String, Long> gList = new HashMap<String, Long>();
> long maxPerGroup = fList.size() / numGroups;
> - if (fList.size() != maxPerGroup * numGroups)
> + if (fList.size() != maxPerGroup * numGroups) {
> maxPerGroup++;
> + }
>
> long i = 0;
> long groupID = 0;
> for (Pair<String, Long> featureFreq : fList) {
> String feature = featureFreq.getFirst();
> - if (i / (maxPerGroup) == groupID) {
> + if (i / maxPerGroup == groupID) {
> gList.put(feature, groupID);
> } else {
> groupID++;
> @@ -291,15 +284,17 @@
> * @throws InterruptedException
> * @throws ClassNotFoundException
> */
> - public static void startParallelCounting(Parameters params)
> - throws IOException, InterruptedException, ClassNotFoundException {
> + public static void startParallelCounting(Parameters params) throws IOException, InterruptedException,
> + ClassNotFoundException {
>
> Configuration conf = new Configuration();
> conf.set("pfp.parameters", params.toString());
>
> + conf.set("mapred.compress.map.output", "true");
> + conf.set("mapred.output.compression.type", "BLOCK");
> +
> String input = params.get("input");
> - Job job = new Job(conf, "Parallel Counting Driver running over input: "
> - + input);
> + Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);
> job.setJarByClass(PFPGrowth.class);
>
> job.setOutputKeyClass(Text.class);
> @@ -325,26 +320,71 @@
> }
>
> /**
> - * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of
> - * group dependent shards
> + * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
> *
> * @param params
> * @throws IOException
> * @throws InterruptedException
> * @throws ClassNotFoundException
> */
> - public static void startParallelFPGrowth(Parameters params)
> - throws IOException, InterruptedException, ClassNotFoundException {
> + public static void startTransactionSorting(Parameters params) throws IOException, InterruptedException,
> + ClassNotFoundException {
>
> Configuration conf = new Configuration();
> + String gList = params.get("gList");
> + params.set("gList", "");
> conf.set("pfp.parameters", params.toString());
> -
> + conf.set("mapred.compress.map.output", "true");
> + conf.set("mapred.output.compression.type", "BLOCK");
> String input = params.get("input");
> + Job job = new Job(conf, "PFP Transaction Sorting running over input" + input);
> + job.setJarByClass(PFPGrowth.class);
> +
> + job.setMapOutputKeyClass(LongWritable.class);
> + job.setMapOutputValueClass(TransactionTree.class);
> +
> + job.setOutputKeyClass(LongWritable.class);
> + job.setOutputValueClass(TransactionTree.class);
> +
> + FileInputFormat.addInputPath(job, new Path(input));
> + Path outPath = new Path(params.get("output") + "/sortedoutput");
> + FileOutputFormat.setOutputPath(job, outPath);
> +
> + FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
> + if (dfs.exists(outPath)) {
> + dfs.delete(outPath, true);
> + }
> +
> + job.setInputFormatClass(TextInputFormat.class);
> + job.setMapperClass(TransactionSortingMapper.class);
> + job.setReducerClass(TransactionSortingReducer.class);
> + job.setOutputFormatClass(SequenceFileOutputFormat.class);
> +
> + job.waitForCompletion(true);
> + params.set("gList", gList);
> + }
> +
> + /**
> + * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
> + *
> + * @param params
> + * @throws IOException
> + * @throws InterruptedException
> + * @throws ClassNotFoundException
> + */
> + public static void startParallelFPGrowth(Parameters params) throws IOException, InterruptedException,
> + ClassNotFoundException {
> +
> + Configuration conf = new Configuration();
> + conf.set("pfp.parameters", params.toString());
> + conf.set("mapred.compress.map.output", "true");
> + conf.set("mapred.output.compression.type", "BLOCK");
> + String input = params.get("output") + "/sortedoutput";
> Job job = new Job(conf, "PFP Growth Driver running over input" + input);
> job.setJarByClass(PFPGrowth.class);
>
> job.setMapOutputKeyClass(LongWritable.class);
> - job.setMapOutputValueClass(IntegerTuple.class);
> + job.setMapOutputValueClass(TransactionTree.class);
>
> job.setOutputKeyClass(Text.class);
> job.setOutputValueClass(TopKStringPatterns.class);
> @@ -358,8 +398,9 @@
> dfs.delete(outPath, true);
> }
>
> - job.setInputFormatClass(TextInputFormat.class);
> + job.setInputFormatClass(SequenceFileInputFormat.class);
> job.setMapperClass(ParallelFPGrowthMapper.class);
> + job.setCombinerClass(ParallelFPGrowthCombiner.class);
> job.setReducerClass(ParallelFPGrowthReducer.class);
> job.setOutputFormatClass(SequenceFileOutputFormat.class);
>
> @@ -374,11 +415,9 @@
> * @return Serialized String representation of List
> * @throws IOException
> */
> - private static String serializeList(List<Pair<String, Long>> list,
> - Configuration conf) throws IOException {
> - conf.set(
> - "io.serializations",
> - "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> + private static String serializeList(List<Pair<String, Long>> list, Configuration conf) throws IOException {
> + conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> + + "org.apache.hadoop.io.serializer.WritableSerialization");
> DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
> conf, GenericsUtil.getClass(list));
> return listStringifier.toString(list);
> @@ -392,13 +431,11 @@
> * @return Serialized String representation of the GList Map
> * @throws IOException
> */
> - private static String serializeMap(Map<String, Long> map, Configuration conf)
> - throws IOException {
> - conf.set(
> - "io.serializations",
> - "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> - DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
> - conf, GenericsUtil.getClass(map));
> + private static String serializeMap(Map<String, Long> map, Configuration conf) throws IOException {
> + conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
> + + "org.apache.hadoop.io.serializer.WritableSerialization");
> + DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(conf,
> + GenericsUtil.getClass(map));
> return mapStringifier.toString(map);
> }
> }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Thu Jan 7 16:45:37 2010
> @@ -1,5 +1,5 @@
> /**
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> +w * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> @@ -17,46 +17,44 @@
>
> package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +import java.util.regex.Pattern;
> +
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.mahout.common.Parameters;
>
> -import java.io.IOException;
> -import java.util.regex.Pattern;
> -
> /**
> *
> - * {@link ParallelCountingMapper} maps all items in a particular transaction
> - * like the way it is done in Hadoop WordCount example
> + * {@link ParallelCountingMapper} maps all items in a particular transaction like the way it is done in Hadoop
> + * WordCount example
> *
> */
> -public class ParallelCountingMapper extends
> - Mapper<LongWritable, Text, Text, LongWritable> {
> +public class ParallelCountingMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
>
> private static final LongWritable one = new LongWritable(1);
>
> private Pattern splitter = null;
>
> @Override
> - protected void map(LongWritable offset, Text input, Context context)
> - throws IOException, InterruptedException {
> + protected void map(LongWritable offset, Text input, Context context) throws IOException,
> + InterruptedException {
>
> String[] items = splitter.split(input.toString());
> - for (String item : items){
> - if(item.trim().length()==0) continue;
> - context.setStatus("Parallel Counting Mapper: "+ item);
> + for (String item : items) {
> + if (item.trim().length() == 0) {
> + continue;
> + }
> + context.setStatus("Parallel Counting Mapper: " + item);
> context.write(new Text(item), one);
> }
> }
>
> @Override
> - protected void setup(Context context) throws IOException,
> - InterruptedException {
> + protected void setup(Context context) throws IOException, InterruptedException {
> super.setup(context);
> - Parameters params = Parameters.fromString(context.getConfiguration().get(
> - "pfp.parameters", ""));
> - splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
> - .toString()));
> + Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> + splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
> }
> }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java Thu Jan 7 16:45:37 2010
> @@ -17,23 +17,21 @@
>
> package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Reducer;
>
> -import java.io.IOException;
> -
> /**
> - * {@link ParallelCountingReducer} sums up the item count and output the item
> - * and the count This can also be used as a local Combiner. A simple summing
> - * reducer
> + * {@link ParallelCountingReducer} sums up the item count and output the item and the count This can also be
> + * used as a local Combiner. A simple summing reducer
> */
> -public class ParallelCountingReducer extends
> - Reducer<Text, LongWritable, Text, LongWritable> {
> +public class ParallelCountingReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
>
> @Override
> - protected void reduce(Text key, Iterable<LongWritable> values, Context context)
> - throws IOException, InterruptedException {
> + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
> + InterruptedException {
> long sum = 0;
> for (LongWritable value : values) {
> context.setStatus("Parallel Counting Reducer :" + key);
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,55 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.mapreduce.Reducer;
> +import org.apache.mahout.common.Pair;
> +
> +/**
> + * {@link ParallelFPGrowthCombiner} takes each group of dependent transactions
> + * and\ compacts it in a TransactionTree structure
> + */
> +
> +public class ParallelFPGrowthCombiner extends
> + Reducer<LongWritable, TransactionTree, LongWritable, TransactionTree> {
> +
> + @Override
> + protected void reduce(LongWritable key, Iterable<TransactionTree> values,
> + Context context) throws IOException, InterruptedException {
> + TransactionTree cTree = new TransactionTree();
> + int count = 0;
> + int node = 0;
> + for (TransactionTree tr : values) {
> + Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> + while (it.hasNext()) {
> + Pair<List<Integer>, Long> p = it.next();
> + node += cTree.addPattern(p.getFirst(), p.getSecond());
> + count++;
> + }
> + }
> +
> + context.write(key, cTree.getCompressedTree());
> +
> + }
> +
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Thu Jan 7 16:45:37 2010
> @@ -17,93 +17,73 @@
>
> package org.apache.mahout.fpm.pfpgrowth;
>
> -import org.apache.hadoop.io.LongWritable;
> -import org.apache.hadoop.io.Text;
> -import org.apache.hadoop.mapreduce.Mapper;
> -import org.apache.mahout.common.IntegerTuple;
> -import org.apache.mahout.common.Pair;
> -import org.apache.mahout.common.Parameters;
> import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.Collections;
> import java.util.HashMap;
> import java.util.HashSet;
> +import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> -import java.util.Set;
> import java.util.Map.Entry;
> -import java.util.regex.Pattern;
> +import java.util.Set;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.mapreduce.Mapper;
> +import org.apache.mahout.common.Pair;
> +import org.apache.mahout.common.Parameters;
>
> /**
> - * {@link ParallelFPGrowthMapper} maps each transaction to all unique items
> - * groups in the transaction. mapper outputs the group id as key and the
> - * transaction as value
> + * {@link ParallelFPGrowthMapper} maps each transaction to all unique items groups in the transaction. mapper
> + * outputs the group id as key and the transaction as value
> *
> */
> public class ParallelFPGrowthMapper extends
> - Mapper<LongWritable, Text, LongWritable, IntegerTuple> {
> -
> - private final Map<String, Integer> fMap = new HashMap<String, Integer>();
> + Mapper<LongWritable, TransactionTree, LongWritable, TransactionTree> {
>
> private final Map<Integer, Long> gListInt = new HashMap<Integer, Long>();
>
> - private Pattern splitter = null;
> -
> @Override
> - protected void map(LongWritable offset, Text input, Context context)
> - throws IOException, InterruptedException {
> -
> - String[] items = splitter.split(input.toString());
> -
> - List<Integer> itemSet = new ArrayList<Integer>();
> - for (String item : items) // remove items not in the fList
> - {
> - if (fMap.containsKey(item) && item.trim().length() != 0)
> - itemSet.add(fMap.get(item));
> - }
> -
> - Collections.sort(itemSet);
> + protected void map(LongWritable offset, TransactionTree input, Context context) throws IOException,
> + InterruptedException {
>
> - Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
> -
> - Set<Long> groups = new HashSet<Long>();
> - for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
> - // dependent
> - // shards
> - Integer item = prunedItems[j];
> - Long groupID = gListInt.get(item);
> - if (groups.contains(groupID) == false) {
> - Integer[] tempItems = new Integer[j + 1];
> - System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> - context
> - .setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: "
> - + item);
> - context.write(new LongWritable(groupID), new IntegerTuple(tempItems));
> + Iterator<Pair<List<Integer>, Long>> it = input.getIterator();
> + while (it.hasNext()) {
> + Pair<List<Integer>, Long> pattern = it.next();
> + Integer[] prunedItems = pattern.getFirst().toArray(new Integer[pattern.getFirst().size()]);
> +
> + Set<Long> groups = new HashSet<Long>();
> + for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
> + // dependent
> + // shards
> + Integer item = prunedItems[j];
> + Long groupID = gListInt.get(item);
> +
> + if (groups.contains(groupID) == false) {
> + Integer[] tempItems = new Integer[j + 1];
> + System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> + context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item);
> + context.write(new LongWritable(groupID), new TransactionTree(tempItems, pattern.getSecond()));
> + }
> + groups.add(groupID);
> }
> - groups.add(groupID);
> }
>
> }
>
> @Override
> - protected void setup(Context context) throws IOException,
> - InterruptedException {
> + protected void setup(Context context) throws IOException, InterruptedException {
> super.setup(context);
> - Parameters params = Parameters.fromString(context.getConfiguration().get(
> - "pfp.parameters", ""));
> + Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
>
> + Map<String, Integer> fMap = new HashMap<String, Integer>();
> int i = 0;
> - for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
> - context.getConfiguration())) {
> + for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
> fMap.put(e.getFirst(), i++);
> }
> -
> - for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList",
> - context.getConfiguration()).entrySet()) {
> +
> + for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList", context.getConfiguration())
> + .entrySet()) {
> gListInt.put(fMap.get(e.getKey()), e.getValue());
> }
> - splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
> - .toString()));
>
> }
> }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Thu Jan 7 16:45:37 2010
> @@ -17,27 +17,30 @@
>
> package org.apache.mahout.fpm.pfpgrowth;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.Collections;
> +import java.util.Comparator;
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +
> +import org.apache.commons.lang.mutable.MutableLong;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Reducer;
> -import org.apache.mahout.common.IntegerTuple;
> import org.apache.mahout.common.Pair;
> import org.apache.mahout.common.Parameters;
> +import org.apache.mahout.fpm.pfpgrowth.convertors.ContextStatusUpdater;
> import org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
> import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
> -import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator;
> import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
> import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
>
> -import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.HashMap;
> -import java.util.HashSet;
> -import java.util.List;
> -import java.util.Map;
> -import java.util.Map.Entry;
> -
> /**
> * {@link ParallelFPGrowthReducer} takes each group of transactions and runs
> * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
> @@ -46,14 +49,16 @@
> */
>
> public class ParallelFPGrowthReducer extends
> - Reducer<LongWritable, IntegerTuple, Text, TopKStringPatterns> {
> + Reducer<LongWritable, TransactionTree, Text, TopKStringPatterns> {
>
> private final List<Pair<Integer, Long>> fList = new ArrayList<Pair<Integer, Long>>();
> -
> +
> private final List<String> featureReverseMap = new ArrayList<String>();
> -
> +
> private final Map<String, Integer> fMap = new HashMap<String, Integer>();
>
> + private final List<String> fRMap = new ArrayList<String>();
> +
> private final Map<Long, List<Integer>> groupFeatures = new HashMap<Long, List<Integer>>();
>
> private int maxHeapSize = 50;
> @@ -61,48 +66,81 @@
> private int minSupport = 3;
>
> @Override
> - protected void reduce(LongWritable key, Iterable<IntegerTuple> values,
> + protected void reduce(LongWritable key, Iterable<TransactionTree> values,
> Context context) throws IOException {
> + TransactionTree cTree = new TransactionTree();
> + int nodes = 0;
> + for (TransactionTree tr : values) {
> + Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> + while (it.hasNext()) {
> + Pair<List<Integer>, Long> p = it.next();
> + nodes += cTree.addPattern(p.getFirst(), p.getSecond());
> + }
> + }
> +
> + List<Pair<Integer, Long>> localFList = new ArrayList<Pair<Integer, Long>>();
> + for (Entry<Integer, MutableLong> fItem : cTree.generateFList().entrySet()) {
> + localFList.add(new Pair<Integer, Long>(fItem.getKey(), fItem.getValue()
> + .toLong()));
> +
> + }
> +
> + Collections.sort(localFList, new Comparator<Pair<Integer, Long>>() {
> +
> + @Override
> + public int compare(Pair<Integer, Long> o1, Pair<Integer, Long> o2) {
> + int ret = o2.getSecond().compareTo(o1.getSecond());
> + if (ret != 0) {
> + return ret;
> + }
> + return o1.getFirst().compareTo(o2.getFirst());
> + }
> +
> + });
> +
> +
> FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
> fpGrowth
> .generateTopKFrequentPatterns(
> - new IntegerTupleIterator(values.iterator()),
> - fList,
> + cTree.getIterator(),
> + localFList,
> minSupport,
> maxHeapSize,
> new HashSet<Integer>(groupFeatures.get(key.get())),
> new IntegerStringOutputConvertor(
> - new ContextWriteOutputCollector<LongWritable, IntegerTuple, Text, TopKStringPatterns>(
> - context), featureReverseMap));
> + new ContextWriteOutputCollector<LongWritable, TransactionTree, Text, TopKStringPatterns>(
> + context), featureReverseMap),
> + new ContextStatusUpdater<LongWritable, TransactionTree, Text, TopKStringPatterns>(
> + context));
> }
>
> @Override
> - protected void setup(Context context) throws IOException, InterruptedException {
> + protected void setup(Context context) throws IOException,
> + InterruptedException {
>
> super.setup(context);
> Parameters params = Parameters.fromString(context.getConfiguration().get(
> "pfp.parameters", ""));
> -
> -
> -
> +
> int i = 0;
> - for(Pair<String, Long> e: PFPGrowth.deserializeList(params, "fList", context
> - .getConfiguration()))
> - {
> + for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
> + context.getConfiguration())) {
> featureReverseMap.add(e.getFirst());
> fMap.put(e.getFirst(), i);
> + fRMap.add(e.getFirst());
> fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
> +
> }
> -
> +
> Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList", context
> .getConfiguration());
> -
> +
> for (Entry<String, Long> entry : gList.entrySet()) {
> List<Integer> groupList = groupFeatures.get(entry.getValue());
> Integer itemInteger = fMap.get(entry.getKey());
> - if (groupList != null)
> + if (groupList != null) {
> groupList.add(itemInteger);
> - else {
> + } else {
> groupList = new ArrayList<Integer>();
> groupList.add(itemInteger);
> groupFeatures.put(entry.getValue(), groupList);
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,85 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Set;
> +import java.util.regex.Pattern;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.io.Text;
> +import org.apache.hadoop.mapreduce.Mapper;
> +import org.apache.mahout.common.Pair;
> +import org.apache.mahout.common.Parameters;
> +
> +/**
> + * {@link TransactionSortingMapper} maps each transaction to all unique items groups in the transaction.
> + * mapper outputs the group id as key and the transaction as value
> + *
> + */
> +public class TransactionSortingMapper extends Mapper<LongWritable, Text, LongWritable, TransactionTree> {
> +
> + private final Map<String, Integer> fMap = new HashMap<String, Integer>();
> +
> + private Pattern splitter = null;
> +
> + @Override
> + protected void map(LongWritable offset, Text input, Context context) throws IOException,
> + InterruptedException {
> +
> + String[] items = splitter.split(input.toString());
> + Set<String> uniqueItems = new HashSet<String>(Arrays.asList(items));
> +
> + List<Integer> itemSet = new ArrayList<Integer>();
> + for (String item : uniqueItems) { // remove items not in the fList
> + if (fMap.containsKey(item) && item.trim().length() != 0) {
> + itemSet.add(fMap.get(item));
> + }
> + }
> +
> + Collections.sort(itemSet);
> +
> + Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
> +
> + if (prunedItems.length > 0) {
> + context.write(new LongWritable(prunedItems[0]), new TransactionTree(prunedItems, 1L));
> + }
> +
> + }
> +
> + @Override
> + protected void setup(Context context) throws IOException, InterruptedException {
> + super.setup(context);
> + Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> +
> + int i = 0;
> + for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
> + fMap.put(e.getFirst(), i++);
> + }
> +
> + splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
> +
> + }
> +}
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,42 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.IOException;
> +
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.mapreduce.Reducer;
> +/**
> + * {@link TransactionSortingReducer} takes each group of transactions and runs
> + * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
> + * group.
> + *
> + */
> +
> +public class TransactionSortingReducer extends
> + Reducer<LongWritable, TransactionTree, LongWritable, TransactionTree> {
> +
> + private static final LongWritable one = new LongWritable(1);
> + @Override
> + protected void reduce(LongWritable key, Iterable<TransactionTree> values,
> + Context context) throws IOException, InterruptedException {
> + for (TransactionTree tr : values) {
> + context.write(one, tr);
> + }
> + }
> +}
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,461 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth;
> +
> +import java.io.DataInput;
> +import java.io.DataOutput;
> +import java.io.IOException;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.Collections;
> +import java.util.Comparator;
> +import java.util.HashMap;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Stack;
> +
> +import org.apache.commons.lang.mutable.MutableLong;
> +import org.apache.hadoop.io.VIntWritable;
> +import org.apache.hadoop.io.VLongWritable;
> +import org.apache.hadoop.io.Writable;
> +import org.apache.mahout.common.Pair;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public final class TransactionTree implements Writable {
> +
> + public final class TransactionTreeIterator implements Iterator<Pair<List<Integer>, Long>> {
> +
> + Stack<int[]> depth = new Stack<int[]>();
> +
> + public TransactionTreeIterator() {
> + depth.push(new int[] {0, -1});
> + }
> +
> + @Override
> + public boolean hasNext() {
> + if (depth.isEmpty()) {
> + return false;
> + }
> + return true;
> + }
> +
> + @Override
> + public Pair<List<Integer>, Long> next() {
> +
> + long sum = 0;
> + int childId = 0;
> + do {
> + int[] top = depth.peek();
> + while (top[1] + 1 == childCount[top[0]]) {
> + depth.pop();
> + top = depth.peek();
> + }
> + if (depth.isEmpty()) {
> + return null;
> + }
> + top[1]++;
> + childId = nodeChildren[top[0]][top[1]];
> + depth.push(new int[] {childId, -1});
> +
> + sum = 0;
> + for (int i = childCount[childId] - 1; i >= 0; i--) {
> + sum += nodeCount[nodeChildren[childId][i]];
> + }
> + } while (sum == nodeCount[childId]);
> +
> + List<Integer> data = new ArrayList<Integer>();
> + Iterator<int[]> it = depth.iterator();
> + it.next();
> + while (it.hasNext()) {
> + data.add(attribute[it.next()[0]]);
> + }
> +
> + Pair<List<Integer>, Long> returnable = new Pair<List<Integer>, Long>(data, nodeCount[childId] - sum);
> +
> + int[] top = depth.peek();
> + while (top[1] + 1 == childCount[top[0]]) {
> + depth.pop();
> + if (depth.isEmpty()) {
> + break;
> + }
> + top = depth.peek();
> + }
> + return returnable;
> + }
> +
> + @Override
> + public void remove() {
> + throw new UnsupportedOperationException();
> + }
> +
> + }
> +
> + private static final int DEFAULT_CHILDREN_INITIAL_SIZE = 2;
> +
> + private static final int DEFAULT_INITIAL_SIZE = 8;
> +
> + private static final float GROWTH_RATE = 1.5f;
> +
> + private static final Logger log = LoggerFactory.getLogger(TransactionTree.class);
> +
> + private static final int ROOTNODEID = 0;
> +
> + private int[] attribute;
> +
> + private int[] childCount;
> +
> + private int[][] nodeChildren;
> +
> + private long[] nodeCount;
> +
> + private int nodes = 0;
> +
> + private boolean representedAsList = false;
> +
> + private List<Pair<List<Integer>, Long>> transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> +
> + public TransactionTree() {
> + this(DEFAULT_INITIAL_SIZE);
> + representedAsList = false;
> + }
> +
> + public TransactionTree(int size) {
> + if (size < DEFAULT_INITIAL_SIZE) {
> + size = DEFAULT_INITIAL_SIZE;
> + }
> + childCount = new int[size];
> + attribute = new int[size];
> + nodeCount = new long[size];
> + nodeChildren = new int[size][];
> + createRootNode();
> + representedAsList = false;
> + }
> +
> + public TransactionTree(Integer[] items, Long support) {
> + representedAsList = true;
> + transactionSet.add(new Pair<List<Integer>, Long>(Arrays.asList(items), support));
> + }
> +
> + public TransactionTree(List<Pair<List<Integer>, Long>> transactionSet) {
> + representedAsList = true;
> + this.transactionSet = transactionSet;
> + }
> +
> + public void addChild(int parentNodeId, int childnodeId) {
> + int length = childCount[parentNodeId];
> + if (length >= nodeChildren[parentNodeId].length) {
> + resizeChildren(parentNodeId);
> + }
> + nodeChildren[parentNodeId][length++] = childnodeId;
> + childCount[parentNodeId] = length;
> +
> + }
> +
> + public boolean addCount(int nodeId, long nextNodeCount) {
> + if (nodeId < nodes) {
> + this.nodeCount[nodeId] += nextNodeCount;
> + return true;
> + }
> + return false;
> + }
> +
> + public int addPattern(List<Integer> myList, long addCount) {
> + int temp = TransactionTree.ROOTNODEID;
> + int ret = 0;
> + boolean addCountMode = true;
> + int child = -1;
> + for (int attributeValue : myList) {
> +
> + if (addCountMode) {
> + child = childWithAttribute(temp, attributeValue);
> + if (child == -1) {
> + addCountMode = false;
> + } else {
> + addCount(child, addCount);
> + temp = child;
> + }
> + }
> + if (!addCountMode) {
> + child = createNode(temp, attributeValue, addCount);
> + temp = child;
> + ret++;
> + }
> + }
> + return ret;
> + }
> +
> + public int attribute(int nodeId) {
> + return this.attribute[nodeId];
> + }
> +
> + public int childAtIndex(int nodeId, int index) {
> + if (childCount[nodeId] < index) {
> + return -1;
> + }
> + return nodeChildren[nodeId][index];
> + }
> +
> + public int childCount() {
> + int sum = 0;
> + for (int i = 0; i < nodes; i++) {
> + sum += childCount[i];
> + }
> + return sum;
> + }
> +
> + public int childCount(int nodeId) {
> + return childCount[nodeId];
> + }
> +
> + public int childWithAttribute(int nodeId, int childAttribute) {
> + int length = childCount[nodeId];
> + for (int i = 0; i < length; i++) {
> + if (attribute[nodeChildren[nodeId][i]] == childAttribute) {
> + return nodeChildren[nodeId][i];
> + }
> + }
> + return -1;
> + }
> +
> + public long count(int nodeId) {
> + return nodeCount[nodeId];
> + }
> +
> + public Map<Integer, MutableLong> generateFList() {
> + Map<Integer, MutableLong> frequencyList = new HashMap<Integer, MutableLong>();
> + Iterator<Pair<List<Integer>, Long>> it = getIterator();
> + int items = 0;
> + int count = 0;
> + while (it.hasNext()) {
> + Pair<List<Integer>, Long> p = it.next();
> + items += p.getFirst().size();
> + count++;
> + for (Integer i : p.getFirst()) {
> + if (frequencyList.containsKey(i) == false) {
> + frequencyList.put(i, new MutableLong(0));
> + }
> + frequencyList.get(i).add(p.getSecond());
> + }
> + }
> + return frequencyList;
> + }
> +
> + public TransactionTree getCompressedTree() {
> + TransactionTree ctree = new TransactionTree();
> + Iterator<Pair<List<Integer>, Long>> it = getIterator();
> + final Map<Integer, MutableLong> fList = generateFList();
> + int node = 0;
> + Comparator<Integer> comparator = new Comparator<Integer>() {
> +
> + @Override
> + public int compare(Integer o1, Integer o2) {
> + return fList.get(o2).compareTo(fList.get(o1));
> + }
> +
> + };
> + int size = 0;
> + List<Pair<List<Integer>, Long>> compressedTransactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> + while (it.hasNext()) {
> + Pair<List<Integer>, Long> p = it.next();
> + Collections.sort(p.getFirst(), comparator);
> + compressedTransactionSet.add(p);
> + node += ctree.addPattern(p.getFirst(), p.getSecond());
> + size += p.getFirst().size() + 2;
> + }
> +
> + log.debug("Nodes in UnCompressed Tree: {} ", nodes);
> + log
> + .debug("UnCompressed Tree Size: {}", (this.nodes * 4 * 4 + this.childCount() * 4)
> + / (double) 1000000);
> + log.debug("Nodes in Compressed Tree: {} ", node);
> + log.debug("Compressed Tree Size: {}", (node * 4 * 4 + ctree.childCount() * 4) / (double) 1000000);
> + log.debug("TransactionSet Size: {}", (size * 4) / (double) 1000000);
> + if (node * 4 * 4 + ctree.childCount() * 4 <= size * 4) {
> + return ctree;
> + } else {
> + ctree = new TransactionTree(compressedTransactionSet);
> + return ctree;
> + }
> + }
> +
> + public Iterator<Pair<List<Integer>, Long>> getIterator() {
> + if (this.isTreeEmpty() && !representedAsList) {
> + throw new IllegalStateException("This is a bug. Please report this to mahout-user list");
> + } else if (representedAsList) {
> + return transactionSet.iterator();
> + } else {
> + return new TransactionTreeIterator();
> + }
> + }
> +
> + public boolean isTreeEmpty() {
> + return nodes <= 1;
> + }
> +
> + @Override
> + public void readFields(DataInput in) throws IOException {
> + representedAsList = in.readBoolean();
> +
> + VIntWritable vInt = new VIntWritable();
> + VLongWritable vLong = new VLongWritable();
> +
> + if (representedAsList) {
> + transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> + vInt.readFields(in);
> + int numTransactions = vInt.get();
> + for (int i = 0; i < numTransactions; i++) {
> + vLong.readFields(in);
> + Long support = vLong.get();
> +
> + vInt.readFields(in);
> + int length = vInt.get();
> +
> + Integer[] items = new Integer[length];
> + for (int j = 0; j < length; j++) {
> + vInt.readFields(in);
> + items[j] = vInt.get();
> + }
> + Pair<List<Integer>, Long> transaction = new Pair<List<Integer>, Long>(Arrays.asList(items), support);
> + transactionSet.add(transaction);
> + }
> + } else {
> + vInt.readFields(in);
> + nodes = vInt.get();
> + attribute = new int[nodes];
> + nodeCount = new long[nodes];
> + childCount = new int[nodes];
> + nodeChildren = new int[nodes][];
> + for (int i = 0; i < nodes; i++) {
> + vInt.readFields(in);
> + attribute[i] = vInt.get();
> + vLong.readFields(in);
> + nodeCount[i] = vLong.get();
> + vInt.readFields(in);
> + childCount[i] = vInt.get();
> + nodeChildren[i] = new int[childCount[i]];
> + for (int j = 0, k = childCount[i]; j < k; j++) {
> + vInt.readFields(in);
> + nodeChildren[i][j] = vInt.get();
> + }
> + }
> + }
> + }
> +
> + @Override
> + public void write(DataOutput out) throws IOException {
> + out.writeBoolean(representedAsList);
> + VIntWritable vInt = new VIntWritable();
> + VLongWritable vLong = new VLongWritable();
> + if (representedAsList) {
> + vInt.set(transactionSet.size());
> + vInt.write(out);
> + for (int i = 0, j = transactionSet.size(); i < j; i++) {
> + Pair<List<Integer>, Long> transaction = transactionSet.get(i);
> +
> + vLong.set(transaction.getSecond().longValue());
> + vLong.write(out);
> +
> + vInt.set(transaction.getFirst().size());
> + vInt.write(out);
> +
> + for (Integer item : transaction.getFirst()) {
> + vInt.set(item);
> + vInt.write(out);
> + }
> + }
> + } else {
> + vInt.set(nodes);
> + vInt.write(out);
> + for (int i = 0; i < nodes; i++) {
> + vInt.set(attribute[i]);
> + vInt.write(out);
> + vLong.set(nodeCount[i]);
> + vLong.write(out);
> + vInt.set(childCount[i]);
> + vInt.write(out);
> + for (int j = 0, k = childCount[i]; j < k; j++) {
> + vInt.set(nodeChildren[i][j]);
> + vInt.write(out);
> + }
> + }
> + }
> + }
> +
> + private int createNode(int parentNodeId, int attributeValue, long count) {
> + if (nodes >= this.attribute.length) {
> + resize();
> + }
> +
> + childCount[nodes] = 0;
> + this.attribute[nodes] = attributeValue;
> + nodeCount[nodes] = count;
> + if (nodeChildren[nodes] == null) {
> + nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> + }
> +
> + int childNodeId = nodes++;
> + addChild(parentNodeId, childNodeId);
> + return childNodeId;
> + }
> +
> + private int createRootNode() {
> + childCount[nodes] = 0;
> + attribute[nodes] = -1;
> + nodeCount[nodes] = 0;
> + if (nodeChildren[nodes] == null) {
> + nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> + }
> + int childNodeId = nodes++;
> + return childNodeId;
> + }
> +
> + private void resize() {
> + int size = (int) (GROWTH_RATE * nodes);
> + if (size < DEFAULT_INITIAL_SIZE) {
> + size = DEFAULT_INITIAL_SIZE;
> + }
> +
> + int[] oldChildCount = childCount;
> + int[] oldAttribute = attribute;
> + long[] oldnodeCount = nodeCount;
> + int[][] oldNodeChildren = nodeChildren;
> +
> + childCount = new int[size];
> + attribute = new int[size];
> + nodeCount = new long[size];
> + nodeChildren = new int[size][];
> +
> + System.arraycopy(oldChildCount, 0, this.childCount, 0, nodes);
> + System.arraycopy(oldAttribute, 0, this.attribute, 0, nodes);
> + System.arraycopy(oldnodeCount, 0, this.nodeCount, 0, nodes);
> + System.arraycopy(oldNodeChildren, 0, this.nodeChildren, 0, nodes);
> + }
> +
> + private void resizeChildren(int nodeId) {
> + int length = childCount[nodeId];
> + int size = (int) (GROWTH_RATE * length);
> + if (size < DEFAULT_CHILDREN_INITIAL_SIZE) {
> + size = DEFAULT_CHILDREN_INITIAL_SIZE;
> + }
> + int[] oldNodeChildren = nodeChildren[nodeId];
> + nodeChildren[nodeId] = new int[size];
> + System.arraycopy(oldNodeChildren, 0, this.nodeChildren[nodeId], 0, length);
> + }
> +}
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,46 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth.convertors;
> +
> +import org.apache.hadoop.io.Writable;
> +import org.apache.hadoop.mapreduce.Reducer;
> +
> +public class ContextStatusUpdater<IK extends Writable, IV extends Writable,
> + K extends Writable, V extends Writable> implements StatusUpdater {
> +
> + private static final long PERIOD = 10000; // Update every 10 seconds
> +
> + private final Reducer<IK, IV, K, V>.Context context;
> +
> + private long time = System.currentTimeMillis();
> +
> + public ContextStatusUpdater(Reducer<IK, IV, K, V>.Context context) {
> + this.context = context;
> + }
> +
> + @Override
> + public void update(String status) {
> + long curTime = System.currentTimeMillis();
> + if (curTime - time > PERIOD && context != null) {
> + time = curTime;
> + context.setStatus("Processing FPTree: " + status);
> + }
> +
> + }
> +
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java Thu Jan 7 16:45:37 2010
> @@ -25,8 +25,8 @@
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> -public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable, K extends Writable, V extends Writable>
> - implements OutputCollector<K, V> {
> +public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable,
> + K extends Writable, V extends Writable> implements OutputCollector<K, V> {
>
> private static final Logger log = LoggerFactory
> .getLogger(ContextWriteOutputCollector.class);
>
> Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java?rev=896922&view=auto
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java (added)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java Thu Jan 7 16:45:37 2010
> @@ -0,0 +1,23 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.mahout.fpm.pfpgrowth.convertors;
> +
> +public interface StatusUpdater {
> +
> + void update(String status);
> +}
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java Thu Jan 7 16:45:37 2010
> @@ -19,40 +19,46 @@
>
> import java.io.IOException;
> import java.util.ArrayList;
> +import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> +import java.util.PriorityQueue;
> +
> import org.apache.hadoop.mapred.OutputCollector;
> import org.apache.mahout.common.Pair;
> import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
> import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
>
> -public final class TopKPatternsOutputConvertor<A> implements
> +public final class TopKPatternsOutputConvertor<A extends Comparable<? super A>> implements
> OutputCollector<Integer, FrequentPatternMaxHeap> {
>
> private OutputCollector<A, List<Pair<List<A>, Long>>> collector = null;
>
> private Map<Integer, A> reverseMapping = null;
>
> - public TopKPatternsOutputConvertor(
> - OutputCollector<A, List<Pair<List<A>, Long>>> collector,
> + public TopKPatternsOutputConvertor(OutputCollector<A, List<Pair<List<A>, Long>>> collector,
> Map<Integer, A> reverseMapping) {
> this.collector = collector;
> this.reverseMapping = reverseMapping;
> }
>
> @Override
> - public void collect(Integer key, FrequentPatternMaxHeap value)
> - throws IOException {
> + public void collect(Integer key, FrequentPatternMaxHeap value) throws IOException {
> List<Pair<List<A>, Long>> perAttributePatterns = new ArrayList<Pair<List<A>, Long>>();
> - for (Pattern itemSet : value.getHeap()) {
> + PriorityQueue<Pattern> t = value.getHeap();
> + while (t.size() > 0) {
> + Pattern itemSet = t.poll();
> List<A> frequentPattern = new ArrayList<A>();
> for (int j = 0; j < itemSet.length(); j++) {
> frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
> }
> - Pair<List<A>, Long> returnItemSet = new Pair<List<A>, Long>(
> - frequentPattern, itemSet.support());
> + Collections.sort(frequentPattern);
> +
> + Pair<List<A>, Long> returnItemSet = new Pair<List<A>, Long>(frequentPattern, itemSet.support());
> perAttributePatterns.add(returnItemSet);
> }
> + Collections.reverse(perAttributePatterns);
> +
> collector.collect(reverseMapping.get(key), perAttributePatterns);
> }
> }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java Thu Jan 7 16:45:37 2010
> @@ -21,14 +21,16 @@
> import java.util.List;
> import java.util.Map;
>
> -public class TransactionIterator<AP> implements Iterator<int[]> {
> +import org.apache.mahout.common.Pair;
> +
> +public class TransactionIterator<AP> implements Iterator<Pair<int[], Long>> {
> private Map<AP, Integer> attributeIdMapping = null;
>
> - private Iterator<List<AP>> iterator = null;
> + private Iterator<Pair<List<AP>, Long>> iterator = null;
>
> private int[] transactionBuffer = null;
>
> - public TransactionIterator(Iterator<List<AP>> iterator,
> + public TransactionIterator(Iterator<Pair<List<AP>, Long>> iterator,
> Map<AP, Integer> attributeIdMapping) {
> this.attributeIdMapping = attributeIdMapping;
> this.iterator = iterator;
> @@ -41,18 +43,19 @@
> }
>
> @Override
> - public final int[] next() {
> - List<AP> transaction = iterator.next();
> + public final Pair<int[], Long> next() {
> + Pair<List<AP>, Long> transaction = iterator.next();
> int index = 0;
> - for (AP Attribute : transaction) {
> - if (attributeIdMapping.containsKey(Attribute)) {
> - transactionBuffer[index++] = attributeIdMapping.get(Attribute);
> +
> + for (AP attribute : transaction.getFirst()) {
> + if (attributeIdMapping.containsKey(attribute)) {
> + transactionBuffer[index++] = attributeIdMapping.get(attribute);
> }
> }
>
> int[] transactionList = new int[index];
> System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
> - return transactionList;
> + return new Pair<int[], Long>(transactionList, transaction.getSecond());
>
> }
>
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java Thu Jan 7 16:45:37 2010
> @@ -42,7 +42,7 @@
> @Override
> public void collect(Integer key, List<Pair<List<Integer>, Long>> value)
> throws IOException {
> - String StringKey = featureReverseMap.get(key);
> + String stringKey = featureReverseMap.get(key);
> List<Pair<List<String>, Long>> stringValues = new ArrayList<Pair<List<String>, Long>>();
> for (Pair<List<Integer>, Long> e : value) {
> List<String> pattern = new ArrayList<String>();
> @@ -53,7 +53,7 @@
> }
>
> collector
> - .collect(new Text(StringKey), new TopKStringPatterns(stringValues));
> + .collect(new Text(stringKey), new TopKStringPatterns(stringValues));
> }
>
> }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java Thu Jan 7 16:45:37 2010
> @@ -25,7 +25,7 @@
> public final class IntegerTupleIterator implements Iterator<List<Integer>> {
>
> private Iterator<IntegerTuple> iterator = null;
> -
> +
> public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
> this.iterator = iterator;
> }
>
> Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=896922&r1=896921&r2=896922&view=diff
> ==============================================================================
> --- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java (original)
> +++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java Thu Jan 7 16:45:37 2010
> @@ -55,10 +55,12 @@
> Pair<List<String>, Long> myItem = null;
> Pair<List<String>, Long> otherItem = null;
> for (int i = 0; i < heapSize; i++) {
> - if (myItem == null && myIterator.hasNext())
> + if (myItem == null && myIterator.hasNext()) {
> myItem = myIterator.next();
> - if (otherItem == null && otherIterator.hasNext())
> + }
> + if (otherItem == null && otherIterator.hasNext()) {
> otherItem = otherIterator.next();
> + }
> if (myItem != null && otherItem != null) {
> int cmp = myItem.getSecond().compareTo(otherItem.getSecond());
> if (cmp == 0) {
> @@ -89,8 +91,9 @@
> } else if (otherItem != null) {
> patterns.add(otherItem);
> otherItem = null;
> - } else
> + } else {
> break;
> + }
> }
> return new TopKStringPatterns(patterns);
> }
>
>
>
Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/
core/src/main/java/org/apache/mah
Posted by deneche abdelhakim <ad...@gmail.com>.
the build is successful, thanks =D
On Fri, Jan 8, 2010 at 9:23 AM, Robin Anil <ro...@gmail.com> wrote:
> Try Now
>
Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/
core/src/main/java/org/apache/mah
Posted by Robin Anil <ro...@gmail.com>.
Try Now
Re: svn commit: r896922 [1/3] - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/
core/src/main/java/org/apache/mah
Posted by Robin Anil <ro...@gmail.com>.
Let me take a look. Seems reducer is not checked in.
On Fri, Jan 8, 2010 at 1:40 PM, deneche abdelhakim <ad...@gmail.com>wrote:
> Hi Robin,
>
> I'm getting a build failure with "mvn clean install":
>
> [INFO] Compilation failure
>
> /home/hakim/mahout-trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java:[89,24]
> cannot find symbol
> symbol : class KeyBasedStringTupleReducer
> location: class
> org.apache.mahout.fpm.pfpgrowth.example.dataset.KeyBasedStringTupleGrouper
>
>
>
> On Thu, Jan 7, 2010 at 5:48 PM, <ro...@apache.org> wrote:
> > Author: robinanil
> > Date: Thu Jan 7 16:45:37 2010
> > New Revision: 896922
> >
> > URL: http://svn.apache.org/viewvc?rev=896922&view=rev
> > Log:
> > MAHOUT-221 FP-Bonsai Pruning and Transaction Compaction
> >
> > Added:
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> >
> lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/TransactionTreeTest.java
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/DeliciousTagsExample.java
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleCombiner.java
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleGrouper.java
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/example/dataset/KeyBasedStringTupleMapper.java
> > Modified:
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
> >
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
> >
> lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
> >
> lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
> >
> lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
> Thu Jan 7 16:45:37 2010
> > @@ -22,31 +22,31 @@
> > import java.util.List;
> > import java.util.regex.Pattern;
> >
> > -public class StringRecordIterator implements Iterator<List<String>> {
> > -
> > +public class StringRecordIterator implements
> Iterator<Pair<List<String>,Long>> {
> > +
> > private final Iterator<String> lineIterator;
> > private Pattern splitter = null;
> > -
> > +
> > public StringRecordIterator(FileLineIterable iterable, String pattern)
> {
> > this.lineIterator = iterable.iterator();
> > this.splitter = Pattern.compile(pattern);
> > }
> > -
> > +
> > @Override
> > public boolean hasNext() {
> > return lineIterator.hasNext();
> > }
> > -
> > +
> > @Override
> > - public List<String> next() {
> > + public Pair<List<String>,Long> next() {
> > String line = lineIterator.next();
> > String[] items = splitter.split(line);
> > - return Arrays.asList(items);
> > + return new Pair<List<String>,Long>(Arrays.asList(items),
> Long.valueOf(1));
> > }
> > -
> > +
> > @Override
> > public void remove() {
> > lineIterator.remove();
> > }
> > -
> > -}
> > +
> > +}
> > \ No newline at end of file
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
> Thu Jan 7 16:45:37 2010
> > @@ -17,34 +17,32 @@
> >
> > package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.List;
> > +
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Mapper;
> > import org.apache.mahout.common.Pair;
> > import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> >
> > -import java.io.IOException;
> > -import java.util.ArrayList;
> > -import java.util.List;
> > -
> > /**
> > *
> > - * {@link AggregatorMapper} outputs the pattern for each item in the
> pattern, so
> > - * that reducer can group them and select the top K frequent patterns
> > + * {@link AggregatorMapper} outputs the pattern for each item in the
> pattern, so that reducer can group them
> > + * and select the top K frequent patterns
> > *
> > */
> > -public class AggregatorMapper extends
> > - Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
> > +public class AggregatorMapper extends Mapper<Text, TopKStringPatterns,
> Text, TopKStringPatterns> {
> >
> > @Override
> > - protected void map(Text key, TopKStringPatterns values, Context
> context)
> > - throws IOException, InterruptedException {
> > + protected void map(Text key, TopKStringPatterns values, Context
> context) throws IOException,
> > + InterruptedException {
> > for (Pair<List<String>, Long> pattern : values.getPatterns()) {
> > for (String item : pattern.getFirst()) {
> > List<Pair<List<String>, Long>> patternSingularList = new
> ArrayList<Pair<List<String>, Long>>();
> > patternSingularList.add(pattern);
> > - context.setStatus("Aggregator Mapper:Grouping Patterns for
> "+item);
> > - context.write(new Text(item),
> > - new TopKStringPatterns(patternSingularList));
> > + context.setStatus("Aggregator Mapper:Grouping Patterns for " +
> item);
> > + context.write(new Text(item), new
> TopKStringPatterns(patternSingularList));
> > }
> > }
> >
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
> Thu Jan 7 16:45:37 2010
> > @@ -17,11 +17,12 @@
> >
> > package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Reducer;
> > import org.apache.mahout.common.Parameters;
> > import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> > -import java.io.IOException;
> >
> > /**
> > *
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/MultiTransactionTreeIterator.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,70 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.util.Iterator;
> > +import java.util.List;
> > +
> > +import org.apache.mahout.common.Pair;
> > +
> > +public final class MultiTransactionTreeIterator implements
> Iterator<List<Integer>> {
> > +
> > + private Iterator<Pair<List<Integer>, Long>> pIterator = null;
> > +
> > + private Pair<List<Integer>, Long> currentPattern = null;
> > +
> > + private long currentCount = 0;
> > +
> > + public MultiTransactionTreeIterator(Iterator<Pair<List<Integer>,
> Long>> iterator) {
> > + this.pIterator = iterator;
> > +
> > + if (pIterator.hasNext()) {
> > + currentPattern = pIterator.next();
> > + currentCount = 0;
> > + } else {
> > + pIterator = null;
> > + }
> > +
> > + }
> > +
> > + @Override
> > + public boolean hasNext() {
> > + return pIterator != null;
> > + }
> > +
> > + @Override
> > + public List<Integer> next() {
> > + List<Integer> returnable = currentPattern.getFirst();
> > + currentCount++;
> > + if (currentCount == currentPattern.getSecond().longValue()) {
> > + if (pIterator.hasNext()) {
> > + currentPattern = pIterator.next();
> > + currentCount = 0;
> > + } else {
> > + pIterator = null;
> > + }
> > + }
> > + return returnable;
> > + }
> > +
> > + @Override
> > + public void remove() {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
> Thu Jan 7 16:45:37 2010
> > @@ -42,7 +42,6 @@
> > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> > import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
> > import org.apache.hadoop.util.GenericsUtil;
> > -import org.apache.mahout.common.IntegerTuple;
> > import org.apache.mahout.common.Pair;
> > import org.apache.mahout.common.Parameters;
> > import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> > @@ -52,11 +51,11 @@
> >
> > /**
> > *
> > - * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as
> described in
> > - * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
> > + * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as
> described in the paper
> > + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
> > *
> > */
> > -public class PFPGrowth {
> > +public final class PFPGrowth {
> > public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][
> ,\t]*");
> >
> > private static final Logger log =
> LoggerFactory.getLogger(PFPGrowth.class);
> > @@ -73,12 +72,11 @@
> > * @return Deserialized Feature Frequency List
> > * @throws IOException
> > */
> > - public static List<Pair<String, Long>> deserializeList(Parameters
> params,
> > - String key, Configuration conf) throws IOException {
> > + public static List<Pair<String, Long>> deserializeList(Parameters
> params, String key, Configuration conf)
> > + throws IOException {
> > List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
> > - conf.set(
> > - "io.serializations",
> > -
> "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > + conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > + + "org.apache.hadoop.io.serializer.WritableSerialization");
> >
> > DefaultStringifier<List<Pair<String, Long>>> listStringifier = new
> DefaultStringifier<List<Pair<String, Long>>>(
> > conf, GenericsUtil.getClass(list));
> > @@ -89,8 +87,8 @@
> > }
> >
> > /**
> > - * Generates the gList(Group ID Mapping of Various frequent Features)
> Map from
> > - * the corresponding serialized representation
> > + * Generates the gList(Group ID Mapping of Various frequent Features)
> Map from the corresponding serialized
> > + * representation
> > *
> > * @param params
> > * @param key
> > @@ -98,16 +96,14 @@
> > * @return Deserialized Group List
> > * @throws IOException
> > */
> > - public static Map<String, Long> deserializeMap(Parameters params,
> String key,
> > - Configuration conf) throws IOException {
> > + public static Map<String, Long> deserializeMap(Parameters params,
> String key, Configuration conf)
> > + throws IOException {
> > Map<String, Long> map = new HashMap<String, Long>();
> > - conf
> > - .set(
> > - "io.serializations",
> > -
> "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > + conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > + + "org.apache.hadoop.io.serializer.WritableSerialization");
> >
> > - DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(
> > - conf, GenericsUtil.getClass(map));
> > + DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(conf,
> > + GenericsUtil.getClass(map));
> > String gListString = mapStringifier.toString(map);
> > gListString = params.get(key, gListString);
> > map = mapStringifier.fromString(gListString);
> > @@ -115,33 +111,30 @@
> > }
> >
> > /**
> > - * read the feature frequency List which is built at the end of the
> Parallel
> > - * counting job
> > + * read the feature frequency List which is built at the end of the
> Parallel counting job
> > *
> > * @param params
> > * @return Feature Frequency List
> > * @throws IOException
> > */
> > - public static List<Pair<String, Long>> readFList(Parameters params)
> > - throws IOException {
> > + public static List<Pair<String, Long>> readFList(Parameters params)
> throws IOException {
> > Writable key = new Text();
> > LongWritable value = new LongWritable();
> > int minSupport = Integer.valueOf(params.get("minSupport", "3"));
> > Configuration conf = new Configuration();
> >
> > - FileSystem fs = FileSystem.get(new Path(params.get("output")
> > - + "/parallelcounting").toUri(), conf);
> > - FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output")
> > - + "/parallelcounting/part-*"));
> > + FileSystem fs = FileSystem.get(new Path(params.get("output") +
> "/parallelcounting").toUri(), conf);
> > + FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output") + "/parallelcounting/part-*"));
> >
> > - PriorityQueue<Pair<String, Long>> queue = new
> PriorityQueue<Pair<String, Long>>(
> > - 11, new Comparator<Pair<String, Long>>() {
> > + PriorityQueue<Pair<String, Long>> queue = new
> PriorityQueue<Pair<String, Long>>(11,
> > + new Comparator<Pair<String, Long>>() {
> >
> > @Override
> > public int compare(Pair<String, Long> o1, Pair<String, Long>
> o2) {
> > int ret = o2.getSecond().compareTo(o1.getSecond());
> > - if (ret != 0)
> > + if (ret != 0) {
> > return ret;
> > + }
> > return o1.getFirst().compareTo(o2.getFirst());
> > }
> >
> > @@ -151,14 +144,16 @@
> > SequenceFile.Reader reader = new SequenceFile.Reader(fs, path,
> conf);
> > // key is feature value is count
> > while (reader.next(key, value)) {
> > - if (value.get() < minSupport)
> > + if (value.get() < minSupport) {
> > continue;
> > + }
> > queue.add(new Pair<String, Long>(key.toString(), value.get()));
> > }
> > }
> > List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
> > - while (queue.isEmpty() == false)
> > + while (queue.isEmpty() == false) {
> > fList.add(queue.poll());
> > + }
> > return fList;
> > }
> >
> > @@ -169,15 +164,13 @@
> > * @return List of TopK patterns for each string frequent feature
> > * @throws IOException
> > */
> > - public static List<Pair<String, TopKStringPatterns>>
> readFrequentPattern(
> > - Parameters params) throws IOException {
> > + public static List<Pair<String, TopKStringPatterns>>
> readFrequentPattern(Parameters params)
> > + throws IOException {
> >
> > Configuration conf = new Configuration();
> >
> > - FileSystem fs = FileSystem.get(new Path(params.get("output")
> > - + "/frequentPatterns").toUri(), conf);
> > - FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output")
> > - + "/frequentPatterns/part-*"));
> > + FileSystem fs = FileSystem.get(new Path(params.get("output") +
> "/frequentPatterns").toUri(), conf);
> > + FileStatus[] outputFiles = fs.globStatus(new
> Path(params.get("output") + "/frequentPatterns/part-*"));
> >
> > List<Pair<String, TopKStringPatterns>> ret = new
> ArrayList<Pair<String, TopKStringPatterns>>();
> > for (FileStatus fileStatus : outputFiles) {
> > @@ -189,42 +182,42 @@
> >
> > /**
> > *
> > - * @param params params should contain input and output locations as a
> string
> > - * value, the additional parameters include minSupport(3),
> > - * maxHeapSize(50), numGroups(1000)
> > + * @param params params should contain input and output locations as a
> string value, the additional
> > + * parameters include minSupport(3), maxHeapSize(50),
> numGroups(1000)
> > * @throws IOException
> > * @throws ClassNotFoundException
> > * @throws InterruptedException
> > */
> > - public static void runPFPGrowth(Parameters params) throws IOException,
> > - InterruptedException, ClassNotFoundException {
> > + public static void runPFPGrowth(Parameters params) throws IOException,
> InterruptedException,
> > + ClassNotFoundException {
> > startParallelCounting(params);
> > startGroupingItems(params);
> > + startTransactionSorting(params);
> > startParallelFPGrowth(params);
> > startAggregating(params);
> > }
> >
> > /**
> > - * Run the aggregation Job to aggregate the different TopK patterns
> and group
> > - * each Pattern by the features present in it and thus calculate the
> final Top
> > - * K frequent Patterns for each feature
> > + * Run the aggregation Job to aggregate the different TopK patterns
> and group each Pattern by the features
> > + * present in it and thus calculate the final Top K frequent Patterns
> for each feature
> > *
> > * @param params
> > * @throws IOException
> > * @throws InterruptedException
> > * @throws ClassNotFoundException
> > */
> > - public static void startAggregating(Parameters params) throws
> IOException,
> > - InterruptedException, ClassNotFoundException {
> > + public static void startAggregating(Parameters params) throws
> IOException, InterruptedException,
> > + ClassNotFoundException {
> >
> > Configuration conf = new Configuration();
> > params.set("fList", "");
> > params.set("gList", "");
> > conf.set("pfp.parameters", params.toString());
> > + conf.set("mapred.compress.map.output", "true");
> > + conf.set("mapred.output.compression.type", "BLOCK");
> >
> > String input = params.get("output") + "/fpgrowth";
> > - Job job = new Job(conf, "PFP Aggregator Driver running over input: "
> > - + input);
> > + Job job = new Job(conf, "PFP Aggregator Driver running over input: "
> + input);
> > job.setJarByClass(PFPGrowth.class);
> >
> > job.setOutputKeyClass(Text.class);
> > @@ -248,8 +241,7 @@
> > }
> >
> > /**
> > - * Group the given Features into g groups as defined by the numGroups
> > - * parameter in params
> > + * Group the given Features into g groups as defined by the numGroups
> parameter in params
> > *
> > * @param params
> > * @throws IOException
> > @@ -261,14 +253,15 @@
> >
> > Map<String, Long> gList = new HashMap<String, Long>();
> > long maxPerGroup = fList.size() / numGroups;
> > - if (fList.size() != maxPerGroup * numGroups)
> > + if (fList.size() != maxPerGroup * numGroups) {
> > maxPerGroup++;
> > + }
> >
> > long i = 0;
> > long groupID = 0;
> > for (Pair<String, Long> featureFreq : fList) {
> > String feature = featureFreq.getFirst();
> > - if (i / (maxPerGroup) == groupID) {
> > + if (i / maxPerGroup == groupID) {
> > gList.put(feature, groupID);
> > } else {
> > groupID++;
> > @@ -291,15 +284,17 @@
> > * @throws InterruptedException
> > * @throws ClassNotFoundException
> > */
> > - public static void startParallelCounting(Parameters params)
> > - throws IOException, InterruptedException, ClassNotFoundException {
> > + public static void startParallelCounting(Parameters params) throws
> IOException, InterruptedException,
> > + ClassNotFoundException {
> >
> > Configuration conf = new Configuration();
> > conf.set("pfp.parameters", params.toString());
> >
> > + conf.set("mapred.compress.map.output", "true");
> > + conf.set("mapred.output.compression.type", "BLOCK");
> > +
> > String input = params.get("input");
> > - Job job = new Job(conf, "Parallel Counting Driver running over
> input: "
> > - + input);
> > + Job job = new Job(conf, "Parallel Counting Driver running over
> input: " + input);
> > job.setJarByClass(PFPGrowth.class);
> >
> > job.setOutputKeyClass(Text.class);
> > @@ -325,26 +320,71 @@
> > }
> >
> > /**
> > - * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K
> features of
> > - * group dependent shards
> > + * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K
> features of group dependent shards
> > *
> > * @param params
> > * @throws IOException
> > * @throws InterruptedException
> > * @throws ClassNotFoundException
> > */
> > - public static void startParallelFPGrowth(Parameters params)
> > - throws IOException, InterruptedException, ClassNotFoundException {
> > + public static void startTransactionSorting(Parameters params) throws
> IOException, InterruptedException,
> > + ClassNotFoundException {
> >
> > Configuration conf = new Configuration();
> > + String gList = params.get("gList");
> > + params.set("gList", "");
> > conf.set("pfp.parameters", params.toString());
> > -
> > + conf.set("mapred.compress.map.output", "true");
> > + conf.set("mapred.output.compression.type", "BLOCK");
> > String input = params.get("input");
> > + Job job = new Job(conf, "PFP Transaction Sorting running over input"
> + input);
> > + job.setJarByClass(PFPGrowth.class);
> > +
> > + job.setMapOutputKeyClass(LongWritable.class);
> > + job.setMapOutputValueClass(TransactionTree.class);
> > +
> > + job.setOutputKeyClass(LongWritable.class);
> > + job.setOutputValueClass(TransactionTree.class);
> > +
> > + FileInputFormat.addInputPath(job, new Path(input));
> > + Path outPath = new Path(params.get("output") + "/sortedoutput");
> > + FileOutputFormat.setOutputPath(job, outPath);
> > +
> > + FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
> > + if (dfs.exists(outPath)) {
> > + dfs.delete(outPath, true);
> > + }
> > +
> > + job.setInputFormatClass(TextInputFormat.class);
> > + job.setMapperClass(TransactionSortingMapper.class);
> > + job.setReducerClass(TransactionSortingReducer.class);
> > + job.setOutputFormatClass(SequenceFileOutputFormat.class);
> > +
> > + job.waitForCompletion(true);
> > + params.set("gList", gList);
> > + }
> > +
> > + /**
> > + * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K
> features of group dependent shards
> > + *
> > + * @param params
> > + * @throws IOException
> > + * @throws InterruptedException
> > + * @throws ClassNotFoundException
> > + */
> > + public static void startParallelFPGrowth(Parameters params) throws
> IOException, InterruptedException,
> > + ClassNotFoundException {
> > +
> > + Configuration conf = new Configuration();
> > + conf.set("pfp.parameters", params.toString());
> > + conf.set("mapred.compress.map.output", "true");
> > + conf.set("mapred.output.compression.type", "BLOCK");
> > + String input = params.get("output") + "/sortedoutput";
> > Job job = new Job(conf, "PFP Growth Driver running over input" +
> input);
> > job.setJarByClass(PFPGrowth.class);
> >
> > job.setMapOutputKeyClass(LongWritable.class);
> > - job.setMapOutputValueClass(IntegerTuple.class);
> > + job.setMapOutputValueClass(TransactionTree.class);
> >
> > job.setOutputKeyClass(Text.class);
> > job.setOutputValueClass(TopKStringPatterns.class);
> > @@ -358,8 +398,9 @@
> > dfs.delete(outPath, true);
> > }
> >
> > - job.setInputFormatClass(TextInputFormat.class);
> > + job.setInputFormatClass(SequenceFileInputFormat.class);
> > job.setMapperClass(ParallelFPGrowthMapper.class);
> > + job.setCombinerClass(ParallelFPGrowthCombiner.class);
> > job.setReducerClass(ParallelFPGrowthReducer.class);
> > job.setOutputFormatClass(SequenceFileOutputFormat.class);
> >
> > @@ -374,11 +415,9 @@
> > * @return Serialized String representation of List
> > * @throws IOException
> > */
> > - private static String serializeList(List<Pair<String, Long>> list,
> > - Configuration conf) throws IOException {
> > - conf.set(
> > - "io.serializations",
> > -
> "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > + private static String serializeList(List<Pair<String, Long>> list,
> Configuration conf) throws IOException {
> > + conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > + + "org.apache.hadoop.io.serializer.WritableSerialization");
> > DefaultStringifier<List<Pair<String, Long>>> listStringifier = new
> DefaultStringifier<List<Pair<String, Long>>>(
> > conf, GenericsUtil.getClass(list));
> > return listStringifier.toString(list);
> > @@ -392,13 +431,11 @@
> > * @return Serialized String representation of the GList Map
> > * @throws IOException
> > */
> > - private static String serializeMap(Map<String, Long> map,
> Configuration conf)
> > - throws IOException {
> > - conf.set(
> > - "io.serializations",
> > -
> "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
> > - DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(
> > - conf, GenericsUtil.getClass(map));
> > + private static String serializeMap(Map<String, Long> map,
> Configuration conf) throws IOException {
> > + conf.set("io.serializations",
> "org.apache.hadoop.io.serializer.JavaSerialization,"
> > + + "org.apache.hadoop.io.serializer.WritableSerialization");
> > + DefaultStringifier<Map<String, Long>> mapStringifier = new
> DefaultStringifier<Map<String, Long>>(conf,
> > + GenericsUtil.getClass(map));
> > return mapStringifier.toString(map);
> > }
> > }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
> Thu Jan 7 16:45:37 2010
> > @@ -1,5 +1,5 @@
> > /**
> > - * Licensed to the Apache Software Foundation (ASF) under one or more
> > +w * Licensed to the Apache Software Foundation (ASF) under one or more
> > * contributor license agreements. See the NOTICE file distributed with
> > * this work for additional information regarding copyright ownership.
> > * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > @@ -17,46 +17,44 @@
> >
> > package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +import java.util.regex.Pattern;
> > +
> > import org.apache.hadoop.io.LongWritable;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Mapper;
> > import org.apache.mahout.common.Parameters;
> >
> > -import java.io.IOException;
> > -import java.util.regex.Pattern;
> > -
> > /**
> > *
> > - * {@link ParallelCountingMapper} maps all items in a particular
> transaction
> > - * like the way it is done in Hadoop WordCount example
> > + * {@link ParallelCountingMapper} maps all items in a particular
> transaction like the way it is done in Hadoop
> > + * WordCount example
> > *
> > */
> > -public class ParallelCountingMapper extends
> > - Mapper<LongWritable, Text, Text, LongWritable> {
> > +public class ParallelCountingMapper extends Mapper<LongWritable, Text,
> Text, LongWritable> {
> >
> > private static final LongWritable one = new LongWritable(1);
> >
> > private Pattern splitter = null;
> >
> > @Override
> > - protected void map(LongWritable offset, Text input, Context context)
> > - throws IOException, InterruptedException {
> > + protected void map(LongWritable offset, Text input, Context context)
> throws IOException,
> > + InterruptedException {
> >
> > String[] items = splitter.split(input.toString());
> > - for (String item : items){
> > - if(item.trim().length()==0) continue;
> > - context.setStatus("Parallel Counting Mapper: "+ item);
> > + for (String item : items) {
> > + if (item.trim().length() == 0) {
> > + continue;
> > + }
> > + context.setStatus("Parallel Counting Mapper: " + item);
> > context.write(new Text(item), one);
> > }
> > }
> >
> > @Override
> > - protected void setup(Context context) throws IOException,
> > - InterruptedException {
> > + protected void setup(Context context) throws IOException,
> InterruptedException {
> > super.setup(context);
> > - Parameters params =
> Parameters.fromString(context.getConfiguration().get(
> > - "pfp.parameters", ""));
> > - splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER
> > - .toString()));
> > + Parameters params =
> Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> > + splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER.toString()));
> > }
> > }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
> Thu Jan 7 16:45:37 2010
> > @@ -17,23 +17,21 @@
> >
> > package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +
> > import org.apache.hadoop.io.LongWritable;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Reducer;
> >
> > -import java.io.IOException;
> > -
> > /**
> > - * {@link ParallelCountingReducer} sums up the item count and output the
> item
> > - * and the count This can also be used as a local Combiner. A simple
> summing
> > - * reducer
> > + * {@link ParallelCountingReducer} sums up the item count and output the
> item and the count This can also be
> > + * used as a local Combiner. A simple summing reducer
> > */
> > -public class ParallelCountingReducer extends
> > - Reducer<Text, LongWritable, Text, LongWritable> {
> > +public class ParallelCountingReducer extends Reducer<Text, LongWritable,
> Text, LongWritable> {
> >
> > @Override
> > - protected void reduce(Text key, Iterable<LongWritable> values, Context
> context)
> > - throws IOException, InterruptedException {
> > + protected void reduce(Text key, Iterable<LongWritable> values, Context
> context) throws IOException,
> > + InterruptedException {
> > long sum = 0;
> > for (LongWritable value : values) {
> > context.setStatus("Parallel Counting Reducer :" + key);
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthCombiner.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,55 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.IOException;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.mapreduce.Reducer;
> > +import org.apache.mahout.common.Pair;
> > +
> > +/**
> > + * {@link ParallelFPGrowthCombiner} takes each group of dependent
> transactions
> > + * and\ compacts it in a TransactionTree structure
> > + */
> > +
> > +public class ParallelFPGrowthCombiner extends
> > + Reducer<LongWritable, TransactionTree, LongWritable,
> TransactionTree> {
> > +
> > + @Override
> > + protected void reduce(LongWritable key, Iterable<TransactionTree>
> values,
> > + Context context) throws IOException, InterruptedException {
> > + TransactionTree cTree = new TransactionTree();
> > + int count = 0;
> > + int node = 0;
> > + for (TransactionTree tr : values) {
> > + Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> > + while (it.hasNext()) {
> > + Pair<List<Integer>, Long> p = it.next();
> > + node += cTree.addPattern(p.getFirst(), p.getSecond());
> > + count++;
> > + }
> > + }
> > +
> > + context.write(key, cTree.getCompressedTree());
> > +
> > + }
> > +
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
> Thu Jan 7 16:45:37 2010
> > @@ -17,93 +17,73 @@
> >
> > package org.apache.mahout.fpm.pfpgrowth;
> >
> > -import org.apache.hadoop.io.LongWritable;
> > -import org.apache.hadoop.io.Text;
> > -import org.apache.hadoop.mapreduce.Mapper;
> > -import org.apache.mahout.common.IntegerTuple;
> > -import org.apache.mahout.common.Pair;
> > -import org.apache.mahout.common.Parameters;
> > import java.io.IOException;
> > -import java.util.ArrayList;
> > -import java.util.Collections;
> > import java.util.HashMap;
> > import java.util.HashSet;
> > +import java.util.Iterator;
> > import java.util.List;
> > import java.util.Map;
> > -import java.util.Set;
> > import java.util.Map.Entry;
> > -import java.util.regex.Pattern;
> > +import java.util.Set;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.mapreduce.Mapper;
> > +import org.apache.mahout.common.Pair;
> > +import org.apache.mahout.common.Parameters;
> >
> > /**
> > - * {@link ParallelFPGrowthMapper} maps each transaction to all unique
> items
> > - * groups in the transaction. mapper outputs the group id as key and the
> > - * transaction as value
> > + * {@link ParallelFPGrowthMapper} maps each transaction to all unique
> items groups in the transaction. mapper
> > + * outputs the group id as key and the transaction as value
> > *
> > */
> > public class ParallelFPGrowthMapper extends
> > - Mapper<LongWritable, Text, LongWritable, IntegerTuple> {
> > -
> > - private final Map<String, Integer> fMap = new HashMap<String,
> Integer>();
> > + Mapper<LongWritable, TransactionTree, LongWritable, TransactionTree>
> {
> >
> > private final Map<Integer, Long> gListInt = new HashMap<Integer,
> Long>();
> >
> > - private Pattern splitter = null;
> > -
> > @Override
> > - protected void map(LongWritable offset, Text input, Context context)
> > - throws IOException, InterruptedException {
> > -
> > - String[] items = splitter.split(input.toString());
> > -
> > - List<Integer> itemSet = new ArrayList<Integer>();
> > - for (String item : items) // remove items not in the fList
> > - {
> > - if (fMap.containsKey(item) && item.trim().length() != 0)
> > - itemSet.add(fMap.get(item));
> > - }
> > -
> > - Collections.sort(itemSet);
> > + protected void map(LongWritable offset, TransactionTree input, Context
> context) throws IOException,
> > + InterruptedException {
> >
> > - Integer[] prunedItems = itemSet.toArray(new
> Integer[itemSet.size()]);
> > -
> > - Set<Long> groups = new HashSet<Long>();
> > - for (int j = prunedItems.length - 1; j >= 0; j--) { // generate
> group
> > - // dependent
> > - // shards
> > - Integer item = prunedItems[j];
> > - Long groupID = gListInt.get(item);
> > - if (groups.contains(groupID) == false) {
> > - Integer[] tempItems = new Integer[j + 1];
> > - System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> > - context
> > - .setStatus("Parallel FPGrowth: Generating Group Dependent
> transactions for: "
> > - + item);
> > - context.write(new LongWritable(groupID), new
> IntegerTuple(tempItems));
> > + Iterator<Pair<List<Integer>, Long>> it = input.getIterator();
> > + while (it.hasNext()) {
> > + Pair<List<Integer>, Long> pattern = it.next();
> > + Integer[] prunedItems = pattern.getFirst().toArray(new
> Integer[pattern.getFirst().size()]);
> > +
> > + Set<Long> groups = new HashSet<Long>();
> > + for (int j = prunedItems.length - 1; j >= 0; j--) { // generate
> group
> > + // dependent
> > + // shards
> > + Integer item = prunedItems[j];
> > + Long groupID = gListInt.get(item);
> > +
> > + if (groups.contains(groupID) == false) {
> > + Integer[] tempItems = new Integer[j + 1];
> > + System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
> > + context.setStatus("Parallel FPGrowth: Generating Group
> Dependent transactions for: " + item);
> > + context.write(new LongWritable(groupID), new
> TransactionTree(tempItems, pattern.getSecond()));
> > + }
> > + groups.add(groupID);
> > }
> > - groups.add(groupID);
> > }
> >
> > }
> >
> > @Override
> > - protected void setup(Context context) throws IOException,
> > - InterruptedException {
> > + protected void setup(Context context) throws IOException,
> InterruptedException {
> > super.setup(context);
> > - Parameters params =
> Parameters.fromString(context.getConfiguration().get(
> > - "pfp.parameters", ""));
> > + Parameters params =
> Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> >
> > + Map<String, Integer> fMap = new HashMap<String, Integer>();
> > int i = 0;
> > - for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList",
> > - context.getConfiguration())) {
> > + for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList", context.getConfiguration())) {
> > fMap.put(e.getFirst(), i++);
> > }
> > -
> > - for (Entry<String, Long> e : PFPGrowth.deserializeMap(params,
> "gList",
> > - context.getConfiguration()).entrySet()) {
> > +
> > + for (Entry<String, Long> e : PFPGrowth.deserializeMap(params,
> "gList", context.getConfiguration())
> > + .entrySet()) {
> > gListInt.put(fMap.get(e.getKey()), e.getValue());
> > }
> > - splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER
> > - .toString()));
> >
> > }
> > }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
> Thu Jan 7 16:45:37 2010
> > @@ -17,27 +17,30 @@
> >
> > package org.apache.mahout.fpm.pfpgrowth;
> >
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.Collections;
> > +import java.util.Comparator;
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Map.Entry;
> > +
> > +import org.apache.commons.lang.mutable.MutableLong;
> > import org.apache.hadoop.io.LongWritable;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Reducer;
> > -import org.apache.mahout.common.IntegerTuple;
> > import org.apache.mahout.common.Pair;
> > import org.apache.mahout.common.Parameters;
> > +import org.apache.mahout.fpm.pfpgrowth.convertors.ContextStatusUpdater;
> > import
> org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
> > import
> org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
> > -import
> org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator;
> > import
> org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
> > import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
> > import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
> >
> > -import java.io.IOException;
> > -import java.util.ArrayList;
> > -import java.util.HashMap;
> > -import java.util.HashSet;
> > -import java.util.List;
> > -import java.util.Map;
> > -import java.util.Map.Entry;
> > -
> > /**
> > * {@link ParallelFPGrowthReducer} takes each group of transactions and
> runs
> > * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for
> each
> > @@ -46,14 +49,16 @@
> > */
> >
> > public class ParallelFPGrowthReducer extends
> > - Reducer<LongWritable, IntegerTuple, Text, TopKStringPatterns> {
> > + Reducer<LongWritable, TransactionTree, Text, TopKStringPatterns> {
> >
> > private final List<Pair<Integer, Long>> fList = new
> ArrayList<Pair<Integer, Long>>();
> > -
> > +
> > private final List<String> featureReverseMap = new ArrayList<String>();
> > -
> > +
> > private final Map<String, Integer> fMap = new HashMap<String,
> Integer>();
> >
> > + private final List<String> fRMap = new ArrayList<String>();
> > +
> > private final Map<Long, List<Integer>> groupFeatures = new
> HashMap<Long, List<Integer>>();
> >
> > private int maxHeapSize = 50;
> > @@ -61,48 +66,81 @@
> > private int minSupport = 3;
> >
> > @Override
> > - protected void reduce(LongWritable key, Iterable<IntegerTuple> values,
> > + protected void reduce(LongWritable key, Iterable<TransactionTree>
> values,
> > Context context) throws IOException {
> > + TransactionTree cTree = new TransactionTree();
> > + int nodes = 0;
> > + for (TransactionTree tr : values) {
> > + Iterator<Pair<List<Integer>, Long>> it = tr.getIterator();
> > + while (it.hasNext()) {
> > + Pair<List<Integer>, Long> p = it.next();
> > + nodes += cTree.addPattern(p.getFirst(), p.getSecond());
> > + }
> > + }
> > +
> > + List<Pair<Integer, Long>> localFList = new ArrayList<Pair<Integer,
> Long>>();
> > + for (Entry<Integer, MutableLong> fItem :
> cTree.generateFList().entrySet()) {
> > + localFList.add(new Pair<Integer, Long>(fItem.getKey(),
> fItem.getValue()
> > + .toLong()));
> > +
> > + }
> > +
> > + Collections.sort(localFList, new Comparator<Pair<Integer, Long>>() {
> > +
> > + @Override
> > + public int compare(Pair<Integer, Long> o1, Pair<Integer, Long> o2)
> {
> > + int ret = o2.getSecond().compareTo(o1.getSecond());
> > + if (ret != 0) {
> > + return ret;
> > + }
> > + return o1.getFirst().compareTo(o2.getFirst());
> > + }
> > +
> > + });
> > +
> > +
> > FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
> > fpGrowth
> > .generateTopKFrequentPatterns(
> > - new IntegerTupleIterator(values.iterator()),
> > - fList,
> > + cTree.getIterator(),
> > + localFList,
> > minSupport,
> > maxHeapSize,
> > new HashSet<Integer>(groupFeatures.get(key.get())),
> > new IntegerStringOutputConvertor(
> > - new ContextWriteOutputCollector<LongWritable,
> IntegerTuple, Text, TopKStringPatterns>(
> > - context), featureReverseMap));
> > + new ContextWriteOutputCollector<LongWritable,
> TransactionTree, Text, TopKStringPatterns>(
> > + context), featureReverseMap),
> > + new ContextStatusUpdater<LongWritable, TransactionTree,
> Text, TopKStringPatterns>(
> > + context));
> > }
> >
> > @Override
> > - protected void setup(Context context) throws IOException,
> InterruptedException {
> > + protected void setup(Context context) throws IOException,
> > + InterruptedException {
> >
> > super.setup(context);
> > Parameters params =
> Parameters.fromString(context.getConfiguration().get(
> > "pfp.parameters", ""));
> > -
> > -
> > -
> > +
> > int i = 0;
> > - for(Pair<String, Long> e: PFPGrowth.deserializeList(params, "fList",
> context
> > - .getConfiguration()))
> > - {
> > + for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList",
> > + context.getConfiguration())) {
> > featureReverseMap.add(e.getFirst());
> > fMap.put(e.getFirst(), i);
> > + fRMap.add(e.getFirst());
> > fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
> > +
> > }
> > -
> > +
> > Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList",
> context
> > .getConfiguration());
> > -
> > +
> > for (Entry<String, Long> entry : gList.entrySet()) {
> > List<Integer> groupList = groupFeatures.get(entry.getValue());
> > Integer itemInteger = fMap.get(entry.getKey());
> > - if (groupList != null)
> > + if (groupList != null) {
> > groupList.add(itemInteger);
> > - else {
> > + } else {
> > groupList = new ArrayList<Integer>();
> > groupList.add(itemInteger);
> > groupFeatures.put(entry.getValue(), groupList);
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,85 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.Collections;
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Set;
> > +import java.util.regex.Pattern;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.io.Text;
> > +import org.apache.hadoop.mapreduce.Mapper;
> > +import org.apache.mahout.common.Pair;
> > +import org.apache.mahout.common.Parameters;
> > +
> > +/**
> > + * {@link TransactionSortingMapper} maps each transaction to all unique
> items groups in the transaction.
> > + * mapper outputs the group id as key and the transaction as value
> > + *
> > + */
> > +public class TransactionSortingMapper extends Mapper<LongWritable, Text,
> LongWritable, TransactionTree> {
> > +
> > + private final Map<String, Integer> fMap = new HashMap<String,
> Integer>();
> > +
> > + private Pattern splitter = null;
> > +
> > + @Override
> > + protected void map(LongWritable offset, Text input, Context context)
> throws IOException,
> > + InterruptedException {
> > +
> > + String[] items = splitter.split(input.toString());
> > + Set<String> uniqueItems = new HashSet<String>(Arrays.asList(items));
> > +
> > + List<Integer> itemSet = new ArrayList<Integer>();
> > + for (String item : uniqueItems) { // remove items not in the fList
> > + if (fMap.containsKey(item) && item.trim().length() != 0) {
> > + itemSet.add(fMap.get(item));
> > + }
> > + }
> > +
> > + Collections.sort(itemSet);
> > +
> > + Integer[] prunedItems = itemSet.toArray(new
> Integer[itemSet.size()]);
> > +
> > + if (prunedItems.length > 0) {
> > + context.write(new LongWritable(prunedItems[0]), new
> TransactionTree(prunedItems, 1L));
> > + }
> > +
> > + }
> > +
> > + @Override
> > + protected void setup(Context context) throws IOException,
> InterruptedException {
> > + super.setup(context);
> > + Parameters params =
> Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
> > +
> > + int i = 0;
> > + for (Pair<String, Long> e : PFPGrowth.deserializeList(params,
> "fList", context.getConfiguration())) {
> > + fMap.put(e.getFirst(), i++);
> > + }
> > +
> > + splitter = Pattern.compile(params.get("splitPattern",
> PFPGrowth.SPLITTER.toString()));
> > +
> > + }
> > +}
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingReducer.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,42 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.IOException;
> > +
> > +import org.apache.hadoop.io.LongWritable;
> > +import org.apache.hadoop.mapreduce.Reducer;
> > +/**
> > + * {@link TransactionSortingReducer} takes each group of transactions
> and runs
> > + * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns
> for each
> > + * group.
> > + *
> > + */
> > +
> > +public class TransactionSortingReducer extends
> > + Reducer<LongWritable, TransactionTree, LongWritable,
> TransactionTree> {
> > +
> > + private static final LongWritable one = new LongWritable(1);
> > + @Override
> > + protected void reduce(LongWritable key, Iterable<TransactionTree>
> values,
> > + Context context) throws IOException, InterruptedException {
> > + for (TransactionTree tr : values) {
> > + context.write(one, tr);
> > + }
> > + }
> > +}
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionTree.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,461 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth;
> > +
> > +import java.io.DataInput;
> > +import java.io.DataOutput;
> > +import java.io.IOException;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.Collections;
> > +import java.util.Comparator;
> > +import java.util.HashMap;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Stack;
> > +
> > +import org.apache.commons.lang.mutable.MutableLong;
> > +import org.apache.hadoop.io.VIntWritable;
> > +import org.apache.hadoop.io.VLongWritable;
> > +import org.apache.hadoop.io.Writable;
> > +import org.apache.mahout.common.Pair;
> > +import org.slf4j.Logger;
> > +import org.slf4j.LoggerFactory;
> > +
> > +public final class TransactionTree implements Writable {
> > +
> > + public final class TransactionTreeIterator implements
> Iterator<Pair<List<Integer>, Long>> {
> > +
> > + Stack<int[]> depth = new Stack<int[]>();
> > +
> > + public TransactionTreeIterator() {
> > + depth.push(new int[] {0, -1});
> > + }
> > +
> > + @Override
> > + public boolean hasNext() {
> > + if (depth.isEmpty()) {
> > + return false;
> > + }
> > + return true;
> > + }
> > +
> > + @Override
> > + public Pair<List<Integer>, Long> next() {
> > +
> > + long sum = 0;
> > + int childId = 0;
> > + do {
> > + int[] top = depth.peek();
> > + while (top[1] + 1 == childCount[top[0]]) {
> > + depth.pop();
> > + top = depth.peek();
> > + }
> > + if (depth.isEmpty()) {
> > + return null;
> > + }
> > + top[1]++;
> > + childId = nodeChildren[top[0]][top[1]];
> > + depth.push(new int[] {childId, -1});
> > +
> > + sum = 0;
> > + for (int i = childCount[childId] - 1; i >= 0; i--) {
> > + sum += nodeCount[nodeChildren[childId][i]];
> > + }
> > + } while (sum == nodeCount[childId]);
> > +
> > + List<Integer> data = new ArrayList<Integer>();
> > + Iterator<int[]> it = depth.iterator();
> > + it.next();
> > + while (it.hasNext()) {
> > + data.add(attribute[it.next()[0]]);
> > + }
> > +
> > + Pair<List<Integer>, Long> returnable = new Pair<List<Integer>,
> Long>(data, nodeCount[childId] - sum);
> > +
> > + int[] top = depth.peek();
> > + while (top[1] + 1 == childCount[top[0]]) {
> > + depth.pop();
> > + if (depth.isEmpty()) {
> > + break;
> > + }
> > + top = depth.peek();
> > + }
> > + return returnable;
> > + }
> > +
> > + @Override
> > + public void remove() {
> > + throw new UnsupportedOperationException();
> > + }
> > +
> > + }
> > +
> > + private static final int DEFAULT_CHILDREN_INITIAL_SIZE = 2;
> > +
> > + private static final int DEFAULT_INITIAL_SIZE = 8;
> > +
> > + private static final float GROWTH_RATE = 1.5f;
> > +
> > + private static final Logger log =
> LoggerFactory.getLogger(TransactionTree.class);
> > +
> > + private static final int ROOTNODEID = 0;
> > +
> > + private int[] attribute;
> > +
> > + private int[] childCount;
> > +
> > + private int[][] nodeChildren;
> > +
> > + private long[] nodeCount;
> > +
> > + private int nodes = 0;
> > +
> > + private boolean representedAsList = false;
> > +
> > + private List<Pair<List<Integer>, Long>> transactionSet = new
> ArrayList<Pair<List<Integer>, Long>>();
> > +
> > + public TransactionTree() {
> > + this(DEFAULT_INITIAL_SIZE);
> > + representedAsList = false;
> > + }
> > +
> > + public TransactionTree(int size) {
> > + if (size < DEFAULT_INITIAL_SIZE) {
> > + size = DEFAULT_INITIAL_SIZE;
> > + }
> > + childCount = new int[size];
> > + attribute = new int[size];
> > + nodeCount = new long[size];
> > + nodeChildren = new int[size][];
> > + createRootNode();
> > + representedAsList = false;
> > + }
> > +
> > + public TransactionTree(Integer[] items, Long support) {
> > + representedAsList = true;
> > + transactionSet.add(new Pair<List<Integer>,
> Long>(Arrays.asList(items), support));
> > + }
> > +
> > + public TransactionTree(List<Pair<List<Integer>, Long>> transactionSet)
> {
> > + representedAsList = true;
> > + this.transactionSet = transactionSet;
> > + }
> > +
> > + public void addChild(int parentNodeId, int childnodeId) {
> > + int length = childCount[parentNodeId];
> > + if (length >= nodeChildren[parentNodeId].length) {
> > + resizeChildren(parentNodeId);
> > + }
> > + nodeChildren[parentNodeId][length++] = childnodeId;
> > + childCount[parentNodeId] = length;
> > +
> > + }
> > +
> > + public boolean addCount(int nodeId, long nextNodeCount) {
> > + if (nodeId < nodes) {
> > + this.nodeCount[nodeId] += nextNodeCount;
> > + return true;
> > + }
> > + return false;
> > + }
> > +
> > + public int addPattern(List<Integer> myList, long addCount) {
> > + int temp = TransactionTree.ROOTNODEID;
> > + int ret = 0;
> > + boolean addCountMode = true;
> > + int child = -1;
> > + for (int attributeValue : myList) {
> > +
> > + if (addCountMode) {
> > + child = childWithAttribute(temp, attributeValue);
> > + if (child == -1) {
> > + addCountMode = false;
> > + } else {
> > + addCount(child, addCount);
> > + temp = child;
> > + }
> > + }
> > + if (!addCountMode) {
> > + child = createNode(temp, attributeValue, addCount);
> > + temp = child;
> > + ret++;
> > + }
> > + }
> > + return ret;
> > + }
> > +
> > + public int attribute(int nodeId) {
> > + return this.attribute[nodeId];
> > + }
> > +
> > + public int childAtIndex(int nodeId, int index) {
> > + if (childCount[nodeId] < index) {
> > + return -1;
> > + }
> > + return nodeChildren[nodeId][index];
> > + }
> > +
> > + public int childCount() {
> > + int sum = 0;
> > + for (int i = 0; i < nodes; i++) {
> > + sum += childCount[i];
> > + }
> > + return sum;
> > + }
> > +
> > + public int childCount(int nodeId) {
> > + return childCount[nodeId];
> > + }
> > +
> > + public int childWithAttribute(int nodeId, int childAttribute) {
> > + int length = childCount[nodeId];
> > + for (int i = 0; i < length; i++) {
> > + if (attribute[nodeChildren[nodeId][i]] == childAttribute) {
> > + return nodeChildren[nodeId][i];
> > + }
> > + }
> > + return -1;
> > + }
> > +
> > + public long count(int nodeId) {
> > + return nodeCount[nodeId];
> > + }
> > +
> > + public Map<Integer, MutableLong> generateFList() {
> > + Map<Integer, MutableLong> frequencyList = new HashMap<Integer,
> MutableLong>();
> > + Iterator<Pair<List<Integer>, Long>> it = getIterator();
> > + int items = 0;
> > + int count = 0;
> > + while (it.hasNext()) {
> > + Pair<List<Integer>, Long> p = it.next();
> > + items += p.getFirst().size();
> > + count++;
> > + for (Integer i : p.getFirst()) {
> > + if (frequencyList.containsKey(i) == false) {
> > + frequencyList.put(i, new MutableLong(0));
> > + }
> > + frequencyList.get(i).add(p.getSecond());
> > + }
> > + }
> > + return frequencyList;
> > + }
> > +
> > + public TransactionTree getCompressedTree() {
> > + TransactionTree ctree = new TransactionTree();
> > + Iterator<Pair<List<Integer>, Long>> it = getIterator();
> > + final Map<Integer, MutableLong> fList = generateFList();
> > + int node = 0;
> > + Comparator<Integer> comparator = new Comparator<Integer>() {
> > +
> > + @Override
> > + public int compare(Integer o1, Integer o2) {
> > + return fList.get(o2).compareTo(fList.get(o1));
> > + }
> > +
> > + };
> > + int size = 0;
> > + List<Pair<List<Integer>, Long>> compressedTransactionSet = new
> ArrayList<Pair<List<Integer>, Long>>();
> > + while (it.hasNext()) {
> > + Pair<List<Integer>, Long> p = it.next();
> > + Collections.sort(p.getFirst(), comparator);
> > + compressedTransactionSet.add(p);
> > + node += ctree.addPattern(p.getFirst(), p.getSecond());
> > + size += p.getFirst().size() + 2;
> > + }
> > +
> > + log.debug("Nodes in UnCompressed Tree: {} ", nodes);
> > + log
> > + .debug("UnCompressed Tree Size: {}", (this.nodes * 4 * 4 +
> this.childCount() * 4)
> > + / (double) 1000000);
> > + log.debug("Nodes in Compressed Tree: {} ", node);
> > + log.debug("Compressed Tree Size: {}", (node * 4 * 4 +
> ctree.childCount() * 4) / (double) 1000000);
> > + log.debug("TransactionSet Size: {}", (size * 4) / (double) 1000000);
> > + if (node * 4 * 4 + ctree.childCount() * 4 <= size * 4) {
> > + return ctree;
> > + } else {
> > + ctree = new TransactionTree(compressedTransactionSet);
> > + return ctree;
> > + }
> > + }
> > +
> > + public Iterator<Pair<List<Integer>, Long>> getIterator() {
> > + if (this.isTreeEmpty() && !representedAsList) {
> > + throw new IllegalStateException("This is a bug. Please report this
> to mahout-user list");
> > + } else if (representedAsList) {
> > + return transactionSet.iterator();
> > + } else {
> > + return new TransactionTreeIterator();
> > + }
> > + }
> > +
> > + public boolean isTreeEmpty() {
> > + return nodes <= 1;
> > + }
> > +
> > + @Override
> > + public void readFields(DataInput in) throws IOException {
> > + representedAsList = in.readBoolean();
> > +
> > + VIntWritable vInt = new VIntWritable();
> > + VLongWritable vLong = new VLongWritable();
> > +
> > + if (representedAsList) {
> > + transactionSet = new ArrayList<Pair<List<Integer>, Long>>();
> > + vInt.readFields(in);
> > + int numTransactions = vInt.get();
> > + for (int i = 0; i < numTransactions; i++) {
> > + vLong.readFields(in);
> > + Long support = vLong.get();
> > +
> > + vInt.readFields(in);
> > + int length = vInt.get();
> > +
> > + Integer[] items = new Integer[length];
> > + for (int j = 0; j < length; j++) {
> > + vInt.readFields(in);
> > + items[j] = vInt.get();
> > + }
> > + Pair<List<Integer>, Long> transaction = new Pair<List<Integer>,
> Long>(Arrays.asList(items), support);
> > + transactionSet.add(transaction);
> > + }
> > + } else {
> > + vInt.readFields(in);
> > + nodes = vInt.get();
> > + attribute = new int[nodes];
> > + nodeCount = new long[nodes];
> > + childCount = new int[nodes];
> > + nodeChildren = new int[nodes][];
> > + for (int i = 0; i < nodes; i++) {
> > + vInt.readFields(in);
> > + attribute[i] = vInt.get();
> > + vLong.readFields(in);
> > + nodeCount[i] = vLong.get();
> > + vInt.readFields(in);
> > + childCount[i] = vInt.get();
> > + nodeChildren[i] = new int[childCount[i]];
> > + for (int j = 0, k = childCount[i]; j < k; j++) {
> > + vInt.readFields(in);
> > + nodeChildren[i][j] = vInt.get();
> > + }
> > + }
> > + }
> > + }
> > +
> > + @Override
> > + public void write(DataOutput out) throws IOException {
> > + out.writeBoolean(representedAsList);
> > + VIntWritable vInt = new VIntWritable();
> > + VLongWritable vLong = new VLongWritable();
> > + if (representedAsList) {
> > + vInt.set(transactionSet.size());
> > + vInt.write(out);
> > + for (int i = 0, j = transactionSet.size(); i < j; i++) {
> > + Pair<List<Integer>, Long> transaction = transactionSet.get(i);
> > +
> > + vLong.set(transaction.getSecond().longValue());
> > + vLong.write(out);
> > +
> > + vInt.set(transaction.getFirst().size());
> > + vInt.write(out);
> > +
> > + for (Integer item : transaction.getFirst()) {
> > + vInt.set(item);
> > + vInt.write(out);
> > + }
> > + }
> > + } else {
> > + vInt.set(nodes);
> > + vInt.write(out);
> > + for (int i = 0; i < nodes; i++) {
> > + vInt.set(attribute[i]);
> > + vInt.write(out);
> > + vLong.set(nodeCount[i]);
> > + vLong.write(out);
> > + vInt.set(childCount[i]);
> > + vInt.write(out);
> > + for (int j = 0, k = childCount[i]; j < k; j++) {
> > + vInt.set(nodeChildren[i][j]);
> > + vInt.write(out);
> > + }
> > + }
> > + }
> > + }
> > +
> > + private int createNode(int parentNodeId, int attributeValue, long
> count) {
> > + if (nodes >= this.attribute.length) {
> > + resize();
> > + }
> > +
> > + childCount[nodes] = 0;
> > + this.attribute[nodes] = attributeValue;
> > + nodeCount[nodes] = count;
> > + if (nodeChildren[nodes] == null) {
> > + nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> > + }
> > +
> > + int childNodeId = nodes++;
> > + addChild(parentNodeId, childNodeId);
> > + return childNodeId;
> > + }
> > +
> > + private int createRootNode() {
> > + childCount[nodes] = 0;
> > + attribute[nodes] = -1;
> > + nodeCount[nodes] = 0;
> > + if (nodeChildren[nodes] == null) {
> > + nodeChildren[nodes] = new int[DEFAULT_CHILDREN_INITIAL_SIZE];
> > + }
> > + int childNodeId = nodes++;
> > + return childNodeId;
> > + }
> > +
> > + private void resize() {
> > + int size = (int) (GROWTH_RATE * nodes);
> > + if (size < DEFAULT_INITIAL_SIZE) {
> > + size = DEFAULT_INITIAL_SIZE;
> > + }
> > +
> > + int[] oldChildCount = childCount;
> > + int[] oldAttribute = attribute;
> > + long[] oldnodeCount = nodeCount;
> > + int[][] oldNodeChildren = nodeChildren;
> > +
> > + childCount = new int[size];
> > + attribute = new int[size];
> > + nodeCount = new long[size];
> > + nodeChildren = new int[size][];
> > +
> > + System.arraycopy(oldChildCount, 0, this.childCount, 0, nodes);
> > + System.arraycopy(oldAttribute, 0, this.attribute, 0, nodes);
> > + System.arraycopy(oldnodeCount, 0, this.nodeCount, 0, nodes);
> > + System.arraycopy(oldNodeChildren, 0, this.nodeChildren, 0, nodes);
> > + }
> > +
> > + private void resizeChildren(int nodeId) {
> > + int length = childCount[nodeId];
> > + int size = (int) (GROWTH_RATE * length);
> > + if (size < DEFAULT_CHILDREN_INITIAL_SIZE) {
> > + size = DEFAULT_CHILDREN_INITIAL_SIZE;
> > + }
> > + int[] oldNodeChildren = nodeChildren[nodeId];
> > + nodeChildren[nodeId] = new int[size];
> > + System.arraycopy(oldNodeChildren, 0, this.nodeChildren[nodeId], 0,
> length);
> > + }
> > +}
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextStatusUpdater.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,46 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth.convertors;
> > +
> > +import org.apache.hadoop.io.Writable;
> > +import org.apache.hadoop.mapreduce.Reducer;
> > +
> > +public class ContextStatusUpdater<IK extends Writable, IV extends
> Writable,
> > + K extends Writable, V extends Writable> implements StatusUpdater {
> > +
> > + private static final long PERIOD = 10000; // Update every 10 seconds
> > +
> > + private final Reducer<IK, IV, K, V>.Context context;
> > +
> > + private long time = System.currentTimeMillis();
> > +
> > + public ContextStatusUpdater(Reducer<IK, IV, K, V>.Context context) {
> > + this.context = context;
> > + }
> > +
> > + @Override
> > + public void update(String status) {
> > + long curTime = System.currentTimeMillis();
> > + if (curTime - time > PERIOD && context != null) {
> > + time = curTime;
> > + context.setStatus("Processing FPTree: " + status);
> > + }
> > +
> > + }
> > +
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
> Thu Jan 7 16:45:37 2010
> > @@ -25,8 +25,8 @@
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > -public class ContextWriteOutputCollector<IK extends Writable, IV extends
> Writable, K extends Writable, V extends Writable>
> > - implements OutputCollector<K, V> {
> > +public class ContextWriteOutputCollector<IK extends Writable, IV extends
> Writable,
> > + K extends Writable, V extends Writable> implements
> OutputCollector<K, V> {
> >
> > private static final Logger log = LoggerFactory
> > .getLogger(ContextWriteOutputCollector.class);
> >
> > Added:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java?rev=896922&view=auto
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> (added)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/StatusUpdater.java
> Thu Jan 7 16:45:37 2010
> > @@ -0,0 +1,23 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.mahout.fpm.pfpgrowth.convertors;
> > +
> > +public interface StatusUpdater {
> > +
> > + void update(String status);
> > +}
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
> Thu Jan 7 16:45:37 2010
> > @@ -19,40 +19,46 @@
> >
> > import java.io.IOException;
> > import java.util.ArrayList;
> > +import java.util.Collections;
> > import java.util.List;
> > import java.util.Map;
> > +import java.util.PriorityQueue;
> > +
> > import org.apache.hadoop.mapred.OutputCollector;
> > import org.apache.mahout.common.Pair;
> > import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
> > import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
> >
> > -public final class TopKPatternsOutputConvertor<A> implements
> > +public final class TopKPatternsOutputConvertor<A extends Comparable<?
> super A>> implements
> > OutputCollector<Integer, FrequentPatternMaxHeap> {
> >
> > private OutputCollector<A, List<Pair<List<A>, Long>>> collector = null;
> >
> > private Map<Integer, A> reverseMapping = null;
> >
> > - public TopKPatternsOutputConvertor(
> > - OutputCollector<A, List<Pair<List<A>, Long>>> collector,
> > + public TopKPatternsOutputConvertor(OutputCollector<A,
> List<Pair<List<A>, Long>>> collector,
> > Map<Integer, A> reverseMapping) {
> > this.collector = collector;
> > this.reverseMapping = reverseMapping;
> > }
> >
> > @Override
> > - public void collect(Integer key, FrequentPatternMaxHeap value)
> > - throws IOException {
> > + public void collect(Integer key, FrequentPatternMaxHeap value) throws
> IOException {
> > List<Pair<List<A>, Long>> perAttributePatterns = new
> ArrayList<Pair<List<A>, Long>>();
> > - for (Pattern itemSet : value.getHeap()) {
> > + PriorityQueue<Pattern> t = value.getHeap();
> > + while (t.size() > 0) {
> > + Pattern itemSet = t.poll();
> > List<A> frequentPattern = new ArrayList<A>();
> > for (int j = 0; j < itemSet.length(); j++) {
> > frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
> > }
> > - Pair<List<A>, Long> returnItemSet = new Pair<List<A>, Long>(
> > - frequentPattern, itemSet.support());
> > + Collections.sort(frequentPattern);
> > +
> > + Pair<List<A>, Long> returnItemSet = new Pair<List<A>,
> Long>(frequentPattern, itemSet.support());
> > perAttributePatterns.add(returnItemSet);
> > }
> > + Collections.reverse(perAttributePatterns);
> > +
> > collector.collect(reverseMapping.get(key), perAttributePatterns);
> > }
> > }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
> Thu Jan 7 16:45:37 2010
> > @@ -21,14 +21,16 @@
> > import java.util.List;
> > import java.util.Map;
> >
> > -public class TransactionIterator<AP> implements Iterator<int[]> {
> > +import org.apache.mahout.common.Pair;
> > +
> > +public class TransactionIterator<AP> implements Iterator<Pair<int[],
> Long>> {
> > private Map<AP, Integer> attributeIdMapping = null;
> >
> > - private Iterator<List<AP>> iterator = null;
> > + private Iterator<Pair<List<AP>, Long>> iterator = null;
> >
> > private int[] transactionBuffer = null;
> >
> > - public TransactionIterator(Iterator<List<AP>> iterator,
> > + public TransactionIterator(Iterator<Pair<List<AP>, Long>> iterator,
> > Map<AP, Integer> attributeIdMapping) {
> > this.attributeIdMapping = attributeIdMapping;
> > this.iterator = iterator;
> > @@ -41,18 +43,19 @@
> > }
> >
> > @Override
> > - public final int[] next() {
> > - List<AP> transaction = iterator.next();
> > + public final Pair<int[], Long> next() {
> > + Pair<List<AP>, Long> transaction = iterator.next();
> > int index = 0;
> > - for (AP Attribute : transaction) {
> > - if (attributeIdMapping.containsKey(Attribute)) {
> > - transactionBuffer[index++] = attributeIdMapping.get(Attribute);
> > +
> > + for (AP attribute : transaction.getFirst()) {
> > + if (attributeIdMapping.containsKey(attribute)) {
> > + transactionBuffer[index++] = attributeIdMapping.get(attribute);
> > }
> > }
> >
> > int[] transactionList = new int[index];
> > System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
> > - return transactionList;
> > + return new Pair<int[], Long>(transactionList,
> transaction.getSecond());
> >
> > }
> >
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
> Thu Jan 7 16:45:37 2010
> > @@ -42,7 +42,7 @@
> > @Override
> > public void collect(Integer key, List<Pair<List<Integer>, Long>> value)
> > throws IOException {
> > - String StringKey = featureReverseMap.get(key);
> > + String stringKey = featureReverseMap.get(key);
> > List<Pair<List<String>, Long>> stringValues = new
> ArrayList<Pair<List<String>, Long>>();
> > for (Pair<List<Integer>, Long> e : value) {
> > List<String> pattern = new ArrayList<String>();
> > @@ -53,7 +53,7 @@
> > }
> >
> > collector
> > - .collect(new Text(StringKey), new
> TopKStringPatterns(stringValues));
> > + .collect(new Text(stringKey), new
> TopKStringPatterns(stringValues));
> > }
> >
> > }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
> Thu Jan 7 16:45:37 2010
> > @@ -25,7 +25,7 @@
> > public final class IntegerTupleIterator implements
> Iterator<List<Integer>> {
> >
> > private Iterator<IntegerTuple> iterator = null;
> > -
> > +
> > public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
> > this.iterator = iterator;
> > }
> >
> > Modified:
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> > URL:
> http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=896922&r1=896921&r2=896922&view=diff
> >
> ==============================================================================
> > ---
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> (original)
> > +++
> lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
> Thu Jan 7 16:45:37 2010
> > @@ -55,10 +55,12 @@
> > Pair<List<String>, Long> myItem = null;
> > Pair<List<String>, Long> otherItem = null;
> > for (int i = 0; i < heapSize; i++) {
> > - if (myItem == null && myIterator.hasNext())
> > + if (myItem == null && myIterator.hasNext()) {
> > myItem = myIterator.next();
> > - if (otherItem == null && otherIterator.hasNext())
> > + }
> > + if (otherItem == null && otherIterator.hasNext()) {
> > otherItem = otherIterator.next();
> > + }
> > if (myItem != null && otherItem != null) {
> > int cmp = myItem.getSecond().compareTo(otherItem.getSecond());
> > if (cmp == 0) {
> > @@ -89,8 +91,9 @@
> > } else if (otherItem != null) {
> > patterns.add(otherItem);
> > otherItem = null;
> > - } else
> > + } else {
> > break;
> > + }
> > }
> > return new TopKStringPatterns(patterns);
> > }
> >
> >
> >
>