You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2009/10/19 00:19:57 UTC
svn commit: r826539 [1/2] - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/common/
core/src/main/java/org/apache/mahout/common/cache/
core/src/main/java/org/apache/mahout/fpm/
core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/...
Author: robinanil
Date: Sun Oct 18 22:19:56 2009
New Revision: 826539
URL: http://svn.apache.org/viewvc?rev=826539&view=rev
Log:
MAHOUT-157 Parallel Frequent Pattern Mining using FPGrowth
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTree.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPTreeDepthCache.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FrequentPatternMaxHeap.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/Pattern.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/package.html
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthJob.java
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/IntegerTuple.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An Ordered List of Integers which can be used in a Hadoop Map/Reduce Job
+ *
+ *
+ */
+public class IntegerTuple implements Writable, WritableComparable<IntegerTuple> {
+
+ private List<Integer> tuple = new ArrayList<Integer>();
+
+ public IntegerTuple() {
+ }
+
+ public IntegerTuple(Integer firstEntry) {
+ add(firstEntry);
+ }
+
+ public IntegerTuple(Collection<Integer> entries) {
+ for(Integer entry: entries)
+ add(entry);
+ }
+
+ public IntegerTuple(Integer[] entries) {
+ for(Integer entry: entries)
+ add(entry);
+ }
+
+ /**
+ * add an entry to the end of the list
+ *
+ * @param entry
+ * @return true if the items get added
+ */
+ public boolean add(Integer entry) {
+ return tuple.add(entry);
+ }
+
+ /**
+ * Fetches the string at the given location
+ *
+ * @param index
+ * @return String value at the given location in the tuple list
+ */
+ public Integer integerAt(int index) {
+ return tuple.get(index);
+ }
+
+ /**
+ * Replaces the string at the given index with the given newString
+ *
+ * @param index
+ * @param newString
+ * @return The previous value at that location
+ */
+ public Integer replaceAt(int index, Integer newInteger) {
+ return tuple.set(index, newInteger);
+ }
+
+ /**
+ * Fetch the list of entries from the tuple
+ *
+ * @return a List containing the strings in the order of insertion
+ */
+ public List<Integer> getEntries() {
+ return Collections.unmodifiableList(this.tuple);
+ }
+
+ /**
+ * Returns the length of the tuple
+ *
+ * @return length
+ */
+ public int length() {
+ return this.tuple.size();
+ }
+
+ @Override
+ public String toString() {
+ return tuple.toString();
+ };
+
+ @Override
+ public int hashCode() {
+ return tuple.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ IntegerTuple other = (IntegerTuple) obj;
+ if (tuple == null) {
+ if (other.tuple != null)
+ return false;
+ } else if (!tuple.equals(other.tuple))
+ return false;
+ return true;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int len = in.readInt();
+ tuple = new ArrayList<Integer>(len);
+ for (int i = 0; i < len; i++) {
+ int data = in.readInt();
+ tuple.add(data);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(tuple.size());
+ for (Integer entry : tuple) {
+ out.writeInt(entry);
+ }
+ }
+
+ @Override
+ public int compareTo(IntegerTuple otherTuple) {
+ int min = Math.min(this.length(), otherTuple.length());
+ for (int i = 0; i < min; i++) {
+ int ret = this.tuple.get(i).compareTo(otherTuple.integerAt(i));
+ if (ret == 0)
+ continue;
+ return ret;
+ }
+ return this.length() - otherTuple.length();
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class StringRecordIterator implements Iterator<List<String>> {
+
+ private Iterator<String> lineIterator;
+ private Pattern splitter = null;
+ public StringRecordIterator(FileLineIterable iterable, String pattern)
+ {
+ this.lineIterator = iterable.iterator();
+ this.splitter = Pattern.compile(pattern);
+ }
+ @Override
+ public boolean hasNext() {
+ return lineIterator.hasNext();
+ }
+
+ @Override
+ public List<String> next() {
+ String line = lineIterator.next();
+ String[] items = splitter.split(line);
+ return Arrays.asList(items);
+ }
+
+ @Override
+ public void remove() {
+ lineIterator.remove();
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/cache/LeastKCache.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.cache;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public class LeastKCache<K extends Comparable<? super K>, V> implements
+ Cache<K, V> {
+
+ private int capacity = 0;
+
+ private Map<K, V> cache = null;
+
+ private PriorityQueue<K> queue = null;
+
+ public LeastKCache(final int capacity) {
+
+ this.capacity = capacity;
+
+ cache = new HashMap<K, V>(capacity);
+ queue = new PriorityQueue<K>(capacity, new Comparator<K>() {
+
+ @Override
+ public int compare(K o1, K o2) {
+ return o2.compareTo(o1);
+ }
+
+ });
+
+ }
+
+ @Override
+ final public long capacity() {
+ return capacity;
+ }
+
+ @Override
+ final public V get(K key) {
+ return cache.get(key);
+ }
+
+ @Override
+ final public void set(K key, V value) {
+ if (contains(key) == false) {
+ queue.add(key);
+ }
+ cache.put(key, value);
+ while (queue.size() > capacity) {
+ K k = queue.poll();
+ cache.remove(k);
+ }
+ }
+
+ @Override
+ final public long size() {
+ return cache.size();
+ }
+
+ @Override
+ final public boolean contains(K key) {
+ return (cache.containsKey(key));
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorMapper.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ *
+ * {@link AggregatorMapper} outputs the pattern for each item in the pattern, so
+ * that reducer can group them and select the top K frequent patterns
+ *
+ */
+public class AggregatorMapper extends
+ Mapper<Text, TopKStringPatterns, Text, TopKStringPatterns> {
+
+ @Override
+ protected void map(Text key, TopKStringPatterns values, Context context)
+ throws IOException, InterruptedException {
+ for (Pair<List<String>, Long> pattern : values.getPatterns()) {
+ for (String item : pattern.getFirst()) {
+ List<Pair<List<String>, Long>> patternSingularList = new ArrayList<Pair<List<String>, Long>>();
+ patternSingularList.add(pattern);
+ context.setStatus("Aggregator Mapper:Grouping Patterns for "+item);
+ context.write(new Text(item),
+ new TopKStringPatterns(patternSingularList));
+ }
+ }
+
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/AggregatorReducer.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ *
+ * {@link AggregatorReducer} groups all Frequent Patterns containing an item and
+ * outputs the top K patterns containing that particular item
+ *
+ */
+public class AggregatorReducer extends
+ Reducer<Text, TopKStringPatterns, Text, TopKStringPatterns> {
+
+ private int maxHeapSize = 50;
+
+ @Override
+ protected void reduce(Text key, Iterable<TopKStringPatterns> values,
+ Context context) throws IOException, InterruptedException {
+ TopKStringPatterns patterns = new TopKStringPatterns();
+ Iterator<TopKStringPatterns> it = values.iterator();
+ while (it.hasNext()) {
+ context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key);
+ patterns = patterns.merge(it.next(), maxHeapSize);
+ }
+ context.write(key, patterns);
+
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ Parameters params = Parameters.fromString(context.getConfiguration().get(
+ "pfp.parameters", ""));
+ maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
+
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.IntegerTuple;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Parallel FP Growth Driver Class. Runs each stage of PFPGrowth as described in
+ * the paper http://infolab.stanford.edu/~echang/recsys08-69.pdf
+ *
+ */
+public class PFPGrowth {
+ private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
+
+ public static Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
+
+ /**
+ *
+ * @param params params should contain input and output locations as a string
+ * value, the additional parameters include minSupport(3),
+ * maxHeapSize(50), numGroups(1000)
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public static void runPFPGrowth(Parameters params) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ startParallelCounting(params);
+ startGroupingItems(params);
+ startParallelFPGrowth(params);
+ startAggregating(params);
+ }
+
+ /**
+ * Converts a given Map in to a String using DefaultStringifier of Hadoop
+ *
+ * @param map
+ * @param conf
+ * @return string representation of the map
+ * @throws IOException
+ */
+ private static String serializeMap(Map<String, Long> map, Configuration conf)
+ throws IOException {
+ conf
+ .set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
+ conf, GenericsUtil.getClass(map));
+ String serializedMapString = mapStringifier.toString(map);
+ return serializedMapString;
+ }
+
+ /**
+ * Generates the gList(Group ID Mapping of Various frequent Features) Map from
+ * the corresponding serialized representation
+ *
+ * @param params
+ * @param key
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public static Map<String, Long> deserializeMap(Parameters params, String key,
+ Configuration conf) throws IOException {
+ Map<String, Long> map = new HashMap<String, Long>();
+ conf
+ .set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+
+ DefaultStringifier<Map<String, Long>> mapStringifier = new DefaultStringifier<Map<String, Long>>(
+ conf, GenericsUtil.getClass(map));
+ String gListString = mapStringifier.toString(map);
+ gListString = params.get(key, gListString);
+ map = mapStringifier.fromString(gListString);
+ return map;
+ }
+
+ /**
+ * Serializes the fList and returns the string representation of the List
+ *
+ * @param list
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ private static String serializeList(List<Pair<String, Long>> list,
+ Configuration conf) throws IOException {
+ conf
+ .set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
+ conf, GenericsUtil.getClass(list));
+ String serializedListString = listStringifier.toString(list);
+ return serializedListString;
+ }
+
+ /**
+ * Generates the fList from the serialized string representation
+ *
+ * @param params
+ * @param key
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public static List<Pair<String, Long>> deserializeList(Parameters params,
+ String key, Configuration conf) throws IOException {
+ List<Pair<String, Long>> list = new ArrayList<Pair<String, Long>>();
+ conf
+ .set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+
+ DefaultStringifier<List<Pair<String, Long>>> listStringifier = new DefaultStringifier<List<Pair<String, Long>>>(
+ conf, GenericsUtil.getClass(list));
+ String serializedString = listStringifier.toString(list);
+ serializedString = params.get(key, serializedString);
+ list = listStringifier.fromString(serializedString);
+ return list;
+ }
+
+ /**
+ * Count the frequencies of various features in parallel using Map/Reduce
+ *
+ * @param params
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ public static void startParallelCounting(Parameters params)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
+ conf.set("pfp.parameters", params.toString());
+
+ String input = params.get("input");
+ Job job = new Job(conf, "Parallel Counting Driver running over input: "
+ + input);
+ job.setJarByClass(PFPGrowth.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ FileInputFormat.addInputPath(job, new Path(input));
+ Path outPath = new Path(params.get("output") + "/parallelcounting");
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+ if (dfs.exists(outPath)) {
+ dfs.delete(outPath, true);
+ }
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(ParallelCountingMapper.class);
+ job.setCombinerClass(ParallelCountingReducer.class);
+ job.setReducerClass(ParallelCountingReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ job.waitForCompletion(true);
+
+ }
+
+ /**
+ * read the feature frequency List which is built at the end of the Parallel
+ * counting job
+ *
+ * @param params
+ * @return
+ * @throws IOException
+ */
+ public static List<Pair<String, Long>> readFList(Parameters params)
+ throws IOException {
+ Writable key = new Text();
+ LongWritable value = new LongWritable();
+ int minSupport = Integer.valueOf(params.get("minSupport", "3"));
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(new Path(params.get("output")
+ + "/parallelcounting").toUri(), conf);
+ FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
+ + "/parallelcounting/part-*"));
+
+ PriorityQueue<Pair<String, Long>> queue = new PriorityQueue<Pair<String, Long>>(
+ 11, new Comparator<Pair<String, Long>>() {
+
+ @Override
+ public int compare(Pair<String, Long> o1, Pair<String, Long> o2) {
+ int ret = o2.getSecond().compareTo(o1.getSecond());
+ if (ret != 0)
+ return ret;
+ return o1.getFirst().compareTo(o2.getFirst());
+ }
+
+ });
+ for (FileStatus fileStatus : outputFiles) {
+ Path path = fileStatus.getPath();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ // key is feature value is count
+ while (reader.next(key, value)) {
+ if (value.get() < minSupport)
+ continue;
+ queue.add(new Pair<String, Long>(key.toString(), value.get()));
+ }
+ }
+ List<Pair<String, Long>> fList = new ArrayList<Pair<String, Long>>();
+ while (queue.isEmpty() == false)
+ fList.add(queue.poll());
+ return fList;
+ }
+
+ /**
+ * Read the Frequent Patterns generated from Text
+ *
+ * @param params
+ * @return List of TopK patterns for each string frequent feature
+ * @throws IOException
+ */
+ public static List<Pair<String, TopKStringPatterns>> readFrequentPattern(
+ Parameters params) throws IOException {
+
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(new Path(params.get("output")
+ + "/frequentPatterns").toUri(), conf);
+ FileStatus[] outputFiles = fs.globStatus(new Path(params.get("output")
+ + "/frequentPatterns/part-*"));
+
+ List<Pair<String, TopKStringPatterns>> ret = new ArrayList<Pair<String, TopKStringPatterns>>();
+ for (FileStatus fileStatus : outputFiles) {
+ Path path = fileStatus.getPath();
+ ret.addAll(FPGrowth.readFrequentPattern(fs, conf, path));
+ }
+ return ret;
+ }
+
+ /**
+ * Group the given Features into g groups as defined by the numGroups
+ * parameter in params
+ *
+ * @param params
+ * @throws IOException
+ */
+ public static void startGroupingItems(Parameters params) throws IOException {
+ Configuration conf = new Configuration();
+ List<Pair<String, Long>> fList = readFList(params);
+ Integer numGroups = Integer.valueOf(params.get("numGroups", "50"));
+
+ Map<String, Long> gList = new HashMap<String, Long>();
+ long groupID = 0;
+ long i = 0;
+ long maxPerGroup = fList.size() / numGroups;
+ if (fList.size() != maxPerGroup * numGroups)
+ maxPerGroup = maxPerGroup + 1;
+
+ for (Pair<String, Long> featureFreq : fList) {
+ String feature = featureFreq.getFirst();
+ if (i / (maxPerGroup) == groupID) {
+ gList.put(feature, groupID);
+ } else {
+ groupID++;
+ gList.put(feature, groupID);
+ }
+ i++;
+ }
+
+ log.info("No of Features: {}", fList.size());
+
+ params.set("gList", serializeMap(gList, conf));
+ params.set("fList", serializeList(fList, conf));
+ }
+
+ /**
+ * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of
+ * group dependent shards
+ *
+ * @param params
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ public static void startParallelFPGrowth(Parameters params)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
+ conf.set("pfp.parameters", params.toString());
+
+ String input = params.get("input");
+ Job job = new Job(conf, "PFP Growth Driver running over input" + input);
+ job.setJarByClass(PFPGrowth.class);
+
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(IntegerTuple.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(TopKStringPatterns.class);
+
+ FileInputFormat.addInputPath(job, new Path(input));
+ Path outPath = new Path(params.get("output") + "/fpgrowth");
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+ if (dfs.exists(outPath)) {
+ dfs.delete(outPath, true);
+ }
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(ParallelFPGrowthMapper.class);
+ job.setReducerClass(ParallelFPGrowthReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ job.waitForCompletion(true);
+ }
+
+ /**
+ * Run the aggregation Job to aggregate the different TopK patterns and group
+ * each Pattern by the features present in it and thus calculate the final Top
+ * K frequent Patterns for each feature
+ *
+ * @param params
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ public static void startAggregating(Parameters params) throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
+ params.set("fList", "");
+ params.set("gList", "");
+ conf.set("pfp.parameters", params.toString());
+
+ String input = params.get("output") + "/fpgrowth";
+ Job job = new Job(conf, "PFP Aggregator Driver running over input: "
+ + input);
+ job.setJarByClass(PFPGrowth.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(TopKStringPatterns.class);
+
+ FileInputFormat.addInputPath(job, new Path(input));
+ Path outPath = new Path(params.get("output") + "/frequentPatterns");
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(AggregatorMapper.class);
+ job.setCombinerClass(AggregatorReducer.class);
+ job.setReducerClass(AggregatorReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+ if (dfs.exists(outPath)) {
+ dfs.delete(outPath, true);
+ }
+ job.waitForCompletion(true);
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Parameters;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ *
+ * {@link ParallelCountingMapper} maps all items in a particular transaction
+ * like the way it is done in Hadoop WordCount example
+ *
+ */
+public class ParallelCountingMapper extends
+ Mapper<LongWritable, Text, Text, LongWritable> {
+
+ private static final LongWritable one = new LongWritable(1);
+
+ private Pattern splitter = null;
+
+ @Override
+ protected void map(LongWritable offset, Text input, Context context)
+ throws IOException, InterruptedException {
+
+ String[] items = splitter.split(input.toString());
+ for (String item : items){
+ if(item.trim().length()==0) continue;
+ context.setStatus("Parallel Counting Mapper: "+ item);
+ context.write(new Text(item), one);
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ Parameters params = Parameters.fromString(context.getConfiguration().get(
+ "pfp.parameters", ""));
+ splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
+ .toString()));
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingReducer.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * {@link ParallelCountingReducer} sums up the item count and output the item
+ * and the count This can also be used as a local Combiner. A simple summing
+ * reducer
+ */
+public class ParallelCountingReducer extends
+ Reducer<Text, LongWritable, Text, LongWritable> {
+
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long sum = 0;
+ Iterator<LongWritable> it = values.iterator();
+ while (it.hasNext()) {
+ context.setStatus("Parallel Counting Reducer :" + key);
+ sum += it.next().get();
+ }
+ context.setStatus("Parallel Counting Reducer: " + key + " => " + sum);
+ context.write(key, new LongWritable(sum));
+
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.IntegerTuple;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
+
+/**
+ * {@link ParallelFPGrowthMapper} maps each transaction to all unique items
+ * groups in the transaction. mapper outputs the group id as key and the
+ * transaction as value
+ *
+ */
+public class ParallelFPGrowthMapper extends
+ Mapper<LongWritable, Text, LongWritable, IntegerTuple> {
+
+ private Map<String, Integer> fMap = new HashMap<String, Integer>();
+
+ private Map<Integer, Long> gListInt = new HashMap<Integer, Long>();
+
+ private Pattern splitter = null;
+
+ @Override
+ protected void map(LongWritable offset, Text input, Context context)
+ throws IOException, InterruptedException {
+
+ String[] items = splitter.split(input.toString());
+
+ List<Integer> itemSet = new ArrayList<Integer>();
+ for (String item : items) // remove items not in the fList
+ {
+ if (fMap.containsKey(item) && item.trim().length() != 0)
+ itemSet.add(fMap.get(item));
+ }
+
+ Collections.sort(itemSet);
+
+ Integer[] prunedItems = itemSet.toArray(new Integer[itemSet.size()]);
+
+ Set<Long> groups = new HashSet<Long>();
+ for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
+ // dependent
+ // shards
+ Integer item = prunedItems[j];
+ Long groupID = gListInt.get(item);
+ if (groups.contains(groupID) == false) {
+ Integer[] tempItems = new Integer[j + 1];
+ System.arraycopy(prunedItems, 0, tempItems, 0, j + 1);
+ context
+ .setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: "
+ + item);
+ context.write(new LongWritable(groupID), new IntegerTuple(tempItems));
+ }
+ groups.add(groupID);
+ }
+
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ Parameters params = Parameters.fromString(context.getConfiguration().get(
+ "pfp.parameters", ""));
+
+ int i = 0;
+ for (Pair<String, Long> e : PFPGrowth.deserializeList(params, "fList",
+ context.getConfiguration())) {
+ fMap.put(e.getFirst(), i++);
+ }
+
+ for (Entry<String, Long> e : PFPGrowth.deserializeMap(params, "gList",
+ context.getConfiguration()).entrySet()) {
+ gListInt.put(fMap.get(e.getKey()), e.getValue());
+ }
+ splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER
+ .toString()));
+
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.IntegerTuple;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.fpm.pfpgrowth.convertors.ContextWriteOutputCollector;
+import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerStringOutputConvertor;
+import org.apache.mahout.fpm.pfpgrowth.convertors.integer.IntegerTupleIterator;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPTreeDepthCache;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * {@link ParallelFPGrowthReducer} takes each group of transactions and runs
+ * Vanilla FPGrowth on it and outputs the the Top K frequent Patterns for each
+ * group.
+ *
+ */
+
+public class ParallelFPGrowthReducer extends
+ Reducer<LongWritable, IntegerTuple, Text, TopKStringPatterns> {
+
+ private List<Pair<Integer, Long>> fList = new ArrayList<Pair<Integer, Long>>();
+
+ private List<String> featureReverseMap = new ArrayList<String>();
+
+ private Map<String, Integer> fMap = new HashMap<String, Integer>();
+
+ private Map<Long, List<Integer>> groupFeatures = new HashMap<Long, List<Integer>>();
+
+ private int maxHeapSize = 50;
+
+ private int minSupport = 3;
+
+ @Override
+ public void reduce(LongWritable key, Iterable<IntegerTuple> values,
+ Context context) throws IOException {
+ FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
+ fpGrowth
+ .generateTopKFrequentPatterns(
+ new IntegerTupleIterator(values.iterator()),
+ fList,
+ minSupport,
+ maxHeapSize,
+ new HashSet<Integer>(groupFeatures.get(key.get())),
+ new IntegerStringOutputConvertor(
+ new ContextWriteOutputCollector<LongWritable, IntegerTuple, Text, TopKStringPatterns>(
+ context), featureReverseMap));
+ }
+
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+
+ super.setup(context);
+ Parameters params = Parameters.fromString(context.getConfiguration().get(
+ "pfp.parameters", ""));
+
+
+
+ int i = 0;
+ for(Pair<String, Long> e: PFPGrowth.deserializeList(params, "fList", context
+ .getConfiguration()))
+ {
+ featureReverseMap.add(e.getFirst());
+ fMap.put(e.getFirst(), i);
+ fList.add(new Pair<Integer, Long>(i++, e.getSecond()));
+ }
+
+ Map<String, Long> gList = PFPGrowth.deserializeMap(params, "gList", context
+ .getConfiguration());
+
+ for (Entry<String, Long> entry : gList.entrySet()) {
+ List<Integer> groupList = groupFeatures.get(entry.getValue());
+ Integer itemInteger = fMap.get(entry.getKey());
+ if (groupList != null)
+ groupList.add(itemInteger);
+ else {
+ groupList = new ArrayList<Integer>();
+ groupList.add(itemInteger);
+ groupFeatures.put(entry.getValue(), groupList);
+ }
+
+ }
+ maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
+ minSupport = Integer.valueOf(params.get("minSupport", "3"));
+ FPTreeDepthCache.FirstLevelCacheSize = Integer.valueOf(params
+ .get("treeCacheSize", Integer
+ .toString(FPTreeDepthCache.FirstLevelCacheSize)));
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/ContextWriteOutputCollector.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContextWriteOutputCollector<IK extends Writable, IV extends Writable, K extends Writable, V extends Writable>
+ implements OutputCollector<K, V> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(ContextWriteOutputCollector.class);
+
+ private Reducer<IK, IV, K, V>.Context context;
+
+ public ContextWriteOutputCollector(Reducer<IK, IV, K, V>.Context context)
+ throws IOException {
+ this.context = context;
+ }
+
+ @Override
+ public final void collect(K key, V value) throws IOException {
+ try {
+ context.setStatus("Writing Top K patterns for: " + key.toString());
+ context.write(key, value);
+ } catch (InterruptedException e) {
+ log.error("{}", e);
+ }
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/SequenceFileOutputCollector.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public class SequenceFileOutputCollector<K extends Writable, V extends Writable>
+ implements OutputCollector<K, V> {
+ private SequenceFile.Writer writer;
+
+ public SequenceFileOutputCollector(SequenceFile.Writer writer)
+ throws IOException {
+ this.writer = writer;
+ }
+
+ @Override
+ public final void collect(K key, V value) throws IOException {
+ writer.append(key, value);
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TopKPatternsOutputConvertor.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FrequentPatternMaxHeap;
+import org.apache.mahout.fpm.pfpgrowth.fpgrowth.Pattern;
+
+final public class TopKPatternsOutputConvertor<AttributePrimitive> implements
+ OutputCollector<Integer, FrequentPatternMaxHeap> {
+
+ private OutputCollector<AttributePrimitive, List<Pair<List<AttributePrimitive>, Long>>> collector = null;
+
+ private Map<Integer, AttributePrimitive> reverseMapping = null;
+
+ public TopKPatternsOutputConvertor(
+ OutputCollector<AttributePrimitive, List<Pair<List<AttributePrimitive>, Long>>> collector,
+ Map<Integer, AttributePrimitive> reverseMapping) {
+ this.collector = collector;
+ this.reverseMapping = reverseMapping;
+ }
+
+ @Override
+ final public void collect(Integer key, FrequentPatternMaxHeap value)
+ throws IOException {
+ List<Pair<List<AttributePrimitive>, Long>> perAttributePatterns = new ArrayList<Pair<List<AttributePrimitive>, Long>>();
+ for (Pattern itemSet : value.getHeap()) {
+ List<AttributePrimitive> frequentPattern = new ArrayList<AttributePrimitive>();
+ for (int j = 0; j < itemSet.length(); j++) {
+ frequentPattern.add(reverseMapping.get(itemSet.getPattern()[j]));
+ }
+ Pair<List<AttributePrimitive>, Long> returnItemSet = new Pair<List<AttributePrimitive>, Long>(
+ frequentPattern, itemSet.getSupport());
+ perAttributePatterns.add(returnItemSet);
+ }
+ collector.collect(reverseMapping.get(key), perAttributePatterns);
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TransactionIterator<AttributePrimitive> implements Iterator<int[]> {
+ private Map<AttributePrimitive, Integer> attributeIdMapping = null;
+
+ private Iterator<List<AttributePrimitive>> iterator = null;
+
+ int[] transactionBuffer = null;
+
+ public TransactionIterator(Iterator<List<AttributePrimitive>> iterator,
+ Map<AttributePrimitive, Integer> attributeIdMapping) {
+ this.attributeIdMapping = attributeIdMapping;
+ this.iterator = iterator;
+ transactionBuffer = new int[attributeIdMapping.size()];
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public int[] next() {
+ List<AttributePrimitive> transaction = iterator.next();
+ int index = 0;
+ for (AttributePrimitive Attribute : transaction) {
+ if (attributeIdMapping.containsKey(Attribute)) {
+ transactionBuffer[index++] = attributeIdMapping.get(Attribute);
+ }
+ }
+
+ int[] transactionList = new int[index];
+ System.arraycopy(transactionBuffer, 0, transactionList, 0, index);
+ return transactionList;
+
+ }
+
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerStringOutputConvertor.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.integer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
+
+public final class IntegerStringOutputConvertor implements
+ OutputCollector<Integer, List<Pair<List<Integer>, Long>>> {
+
+ OutputCollector<Text, TopKStringPatterns> collector = null;
+
+ List<String> featureReverseMap = null;
+
+ public IntegerStringOutputConvertor(
+ OutputCollector<Text, TopKStringPatterns> collector,
+ List<String> featureReverseMap) {
+ this.collector = collector;
+ this.featureReverseMap = featureReverseMap;
+ }
+
+ @Override
+ final public void collect(Integer key, List<Pair<List<Integer>, Long>> value)
+ throws IOException {
+ String StringKey = featureReverseMap.get(key);
+ List<Pair<List<String>, Long>> stringValues = new ArrayList<Pair<List<String>, Long>>();
+ for (Pair<List<Integer>, Long> e : value) {
+ List<String> pattern = new ArrayList<String>();
+ for (Integer i : e.getFirst()) {
+ pattern.add(featureReverseMap.get(i));
+ }
+ stringValues.add(new Pair<List<String>, Long>(pattern, e.getSecond()));
+ }
+
+ collector
+ .collect(new Text(StringKey), new TopKStringPatterns(stringValues));
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.integer;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.common.IntegerTuple;
+
+public final class IntegerTupleIterator implements Iterator<List<Integer>> {
+
+ private Iterator<IntegerTuple> iterator = null;
+
+ public IntegerTupleIterator(Iterator<IntegerTuple> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ final public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ final public List<Integer> next() {
+ IntegerTuple transaction = iterator.next();
+ return transaction.getEntries();
+ }
+
+ @Override
+ final public void remove() {
+ iterator.remove();
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringOutputConvertor.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.common.Pair;
+
+public final class StringOutputConvertor implements
+ OutputCollector<String, List<Pair<List<String>, Long>>> {
+
+ OutputCollector<Text, TopKStringPatterns> collector = null;
+
+ public StringOutputConvertor(
+ OutputCollector<Text, TopKStringPatterns> collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ final public void collect(String key, List<Pair<List<String>, Long>> value)
+ throws IOException {
+ collector.collect(new Text(key), new TopKStringPatterns(value));
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.common.StringTuple;
+
+public final class StringTupleIterator implements Iterator<List<String>> {
+
+ private Iterator<StringTuple> iterator = null;
+
+ public StringTupleIterator(Iterator<StringTuple> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ final public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ final public List<String> next() {
+ StringTuple transaction = iterator.next();
+ return transaction.getEntries();
+ }
+
+ @Override
+ final public void remove() {
+ iterator.remove();
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java?rev=826539&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/TopKStringPatterns.java Sun Oct 18 22:19:56 2009
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.convertors.string;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+public final class TopKStringPatterns implements Writable {
+ private List<Pair<List<String>, Long>> frequentPatterns = null;
+
+ public TopKStringPatterns() {
+ frequentPatterns = new ArrayList<Pair<List<String>, Long>>();
+ }
+
+ public TopKStringPatterns(List<Pair<List<String>, Long>> patterns) {
+ frequentPatterns = new ArrayList<Pair<List<String>, Long>>();
+ frequentPatterns.addAll(patterns);
+ }
+
+ final public Iterator<Pair<List<String>, Long>> iterator() {
+ return frequentPatterns.iterator();
+ }
+
+ final public List<Pair<List<String>, Long>> getPatterns() {
+ return frequentPatterns;
+ }
+
+ final public TopKStringPatterns merge(TopKStringPatterns pattern, int heapSize) {
+ List<Pair<List<String>, Long>> patterns = new ArrayList<Pair<List<String>, Long>>();
+ Iterator<Pair<List<String>, Long>> myIterator = frequentPatterns.iterator();
+ Iterator<Pair<List<String>, Long>> otherIterator = pattern.iterator();
+ Pair<List<String>, Long> myItem = null;
+ Pair<List<String>, Long> otherItem = null;
+ for (int i = 0; i < heapSize; i++) {
+ if (myItem == null && myIterator.hasNext())
+ myItem = myIterator.next();
+ if (otherItem == null && otherIterator.hasNext())
+ otherItem = otherIterator.next();
+ if (myItem != null && otherItem != null) {
+ int cmp = myItem.getSecond().compareTo(otherItem.getSecond());
+ if (cmp == 0) {
+ cmp = myItem.getFirst().size() - otherItem.getFirst().size();
+ if (cmp == 0) {
+ for (int j = 0; j < myItem.getFirst().size(); j++) {
+ cmp = myItem.getFirst().get(j).compareTo(
+ otherItem.getFirst().get(j));
+ if (cmp == 0)
+ continue;
+ else break;
+ }
+ }
+ }
+ if (cmp <= 0) {
+ patterns.add(otherItem);
+ if (cmp == 0) {
+ myItem = null;
+ }
+ otherItem = null;
+ } else if (cmp > 0) {
+ patterns.add(myItem);
+ myItem = null;
+ }
+ } else if (myItem != null) {
+ patterns.add(myItem);
+ myItem = null;
+ } else if (otherItem != null) {
+ patterns.add(otherItem);
+ otherItem = null;
+ } else
+ break;
+ }
+ return new TopKStringPatterns(patterns);
+ }
+
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ frequentPatterns.clear();
+ int length = in.readInt();
+ for (int i = 0; i < length; i++) {
+ List<String> items = new ArrayList<String>();
+ int itemsetLength = in.readInt();
+ long support = in.readLong();
+ for (int j = 0; j < itemsetLength; j++) {
+ int itemLength = in.readInt();
+ byte[] data = new byte[itemLength];
+ in.readFully(data);
+ items.add(Bytes.toString(data));
+ }
+ frequentPatterns.add(new Pair<List<String>, Long>(items, support));
+ }
+ }
+
+ @Override
+ final public void write(DataOutput out) throws IOException {
+ out.writeInt(frequentPatterns.size());
+ for (Pair<List<String>, Long> pattern : frequentPatterns) {
+ out.writeInt(pattern.getFirst().size());
+ out.writeLong(pattern.getSecond());
+ for (String item : pattern.getFirst()) {
+ byte[] data = Bytes.toBytes(item);
+ out.writeInt(data.length);
+ out.write(data);
+ }
+ }
+ }
+
+ @Override
+ final public String toString() {
+ StringBuilder sb = new StringBuilder();
+ String sep = "";
+ for (Pair<List<String>, Long> pattern : frequentPatterns) {
+ sb.append(sep);
+ sb.append(pattern.toString());
+ sep = ", ";
+
+ }
+ return sb.toString();
+
+ }
+}