You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2010/10/05 00:49:02 UTC
svn commit: r1004466 - in /mahout/trunk/core/src:
main/java/org/apache/mahout/fpm/pfpgrowth/
test/java/org/apache/mahout/fpm/pfpgrowth/
Author: robinanil
Date: Mon Oct 4 22:49:02 2010
New Revision: 1004466
URL: http://svn.apache.org/viewvc?rev=1004466&view=rev
Log:
Make hardcoded paths and constants in PFPGrowth final variables and use temp dirs in test
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Mon Oct 4 22:49:02 2010
@@ -57,6 +57,24 @@ import org.slf4j.LoggerFactory;
*
*/
public final class PFPGrowth {
+
+ public static final String ENCODING = "encoding";
+ public static final String F_LIST = "fList";
+ public static final String G_LIST = "gList";
+ public static final String NUM_GROUPS = "numGroups";
+ public static final String OUTPUT = "output";
+ public static final String MIN_SUPPORT = "minSupport";
+ public static final String MAX_HEAPSIZE = "maxHeapSize";
+ public static final String INPUT = "input";
+ public static final String PFP_PARAMETERS = "pfp.parameters";
+ public static final String FILE_PATTERN = "part-*";
+ public static final String FPGROWTH = "fpgrowth";
+ public static final String FREQUENT_PATTERNS = "frequentpatterns";
+ public static final String PARALLEL_COUNTING = "parallelcounting";
+ public static final String SORTED_OUTPUT = "sortedoutput";
+ public static final String SPLIT_PATTERN = "splitPattern";
+ public static final String TREE_CACHE_SIZE = "treeCacheSize";
+
public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
@@ -116,12 +134,12 @@ public final class PFPGrowth {
public static List<Pair<String,Long>> readFList(Parameters params) throws IOException {
Writable key = new Text();
LongWritable value = new LongWritable();
- int minSupport = Integer.valueOf(params.get("minSupport", "3"));
+ int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3"));
Configuration conf = new Configuration();
-
- Path parallelCountingPath = new Path(params.get("output"), "parallelcounting");
+
+ Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
FileSystem fs = FileSystem.get(parallelCountingPath.toUri(), conf);
- FileStatus[] outputFiles = fs.globStatus(new Path(parallelCountingPath, "part-*"));
+ FileStatus[] outputFiles = fs.globStatus(new Path(parallelCountingPath, FILE_PATTERN));
PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11,
new Comparator<Pair<String,Long>>() {
@@ -162,9 +180,9 @@ public final class PFPGrowth {
Configuration conf = new Configuration();
- Path frequentPatternsPath = new Path(params.get("output"), "frequentPatterns");
+ Path frequentPatternsPath = new Path(params.get(OUTPUT), FREQUENT_PATTERNS);
FileSystem fs = FileSystem.get(frequentPatternsPath.toUri(), conf);
- FileStatus[] outputFiles = fs.globStatus(new Path(frequentPatternsPath, "part-*"));
+ FileStatus[] outputFiles = fs.globStatus(new Path(frequentPatternsPath, FILE_PATTERN));
List<Pair<String,TopKStringPatterns>> ret = new ArrayList<Pair<String,TopKStringPatterns>>();
for (FileStatus fileStatus : outputFiles) {
@@ -199,21 +217,21 @@ public final class PFPGrowth {
ClassNotFoundException {
Configuration conf = new Configuration();
- params.set("fList", "");
- params.set("gList", "");
- conf.set("pfp.parameters", params.toString());
+ params.set(F_LIST, "");
+ params.set(G_LIST, "");
+ conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
- String input = params.get("output") + "/fpgrowth";
+ Path input = new Path(params.get(OUTPUT), FPGROWTH);
Job job = new Job(conf, "PFP Aggregator Driver running over input: " + input);
job.setJarByClass(PFPGrowth.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TopKStringPatterns.class);
- FileInputFormat.addInputPath(job, new Path(input));
- Path outPath = new Path(params.get("output"), "frequentPatterns");
+ FileInputFormat.addInputPath(job, input);
+ Path outPath = new Path(params.get(OUTPUT), FREQUENT_PATTERNS);
FileOutputFormat.setOutputPath(job, outPath);
job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -235,7 +253,7 @@ public final class PFPGrowth {
public static void startGroupingItems(Parameters params) throws IOException {
Configuration conf = new Configuration();
List<Pair<String,Long>> fList = readFList(params);
- Integer numGroups = Integer.valueOf(params.get("numGroups", "50"));
+ Integer numGroups = Integer.valueOf(params.get(NUM_GROUPS, "50"));
Map<String,Long> gList = new HashMap<String,Long>();
long maxPerGroup = fList.size() / numGroups;
@@ -258,8 +276,8 @@ public final class PFPGrowth {
log.info("No of Features: {}", fList.size());
- params.set("gList", serializeMap(gList, conf));
- params.set("fList", serializeList(fList, conf));
+ params.set(G_LIST, serializeMap(gList, conf));
+ params.set(F_LIST, serializeList(fList, conf));
}
/**
@@ -270,12 +288,12 @@ public final class PFPGrowth {
ClassNotFoundException {
Configuration conf = new Configuration();
- conf.set("pfp.parameters", params.toString());
+ conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
- String input = params.get("input");
+ String input = params.get(INPUT);
Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);
job.setJarByClass(PFPGrowth.class);
@@ -283,7 +301,7 @@ public final class PFPGrowth {
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(input));
- Path outPath = new Path(params.get("output"), "parallelcounting");
+ Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
FileOutputFormat.setOutputPath(job, outPath);
HadoopUtil.overwriteOutput(outPath);
@@ -306,12 +324,12 @@ public final class PFPGrowth {
ClassNotFoundException {
Configuration conf = new Configuration();
- String gList = params.get("gList");
- params.set("gList", "");
- conf.set("pfp.parameters", params.toString());
+ String gList = params.get(G_LIST);
+ params.set(G_LIST, "");
+ conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
- String input = params.get("input");
+ String input = params.get(INPUT);
Job job = new Job(conf, "PFP Transaction Sorting running over input" + input);
job.setJarByClass(PFPGrowth.class);
@@ -322,7 +340,7 @@ public final class PFPGrowth {
job.setOutputValueClass(TransactionTree.class);
FileInputFormat.addInputPath(job, new Path(input));
- Path outPath = new Path(params.get("output") + "/sortedoutput");
+ Path outPath = new Path(params.get(OUTPUT), SORTED_OUTPUT);
FileOutputFormat.setOutputPath(job, outPath);
HadoopUtil.overwriteOutput(outPath);
@@ -333,7 +351,7 @@ public final class PFPGrowth {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.waitForCompletion(true);
- params.set("gList", gList);
+ params.set(G_LIST, gList);
}
/**
@@ -344,10 +362,10 @@ public final class PFPGrowth {
ClassNotFoundException {
Configuration conf = new Configuration();
- conf.set("pfp.parameters", params.toString());
+ conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
- String input = params.get("output") + "/sortedoutput";
+ Path input = new Path(params.get(OUTPUT), SORTED_OUTPUT);
Job job = new Job(conf, "PFP Growth Driver running over input" + input);
job.setJarByClass(PFPGrowth.class);
@@ -357,8 +375,8 @@ public final class PFPGrowth {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TopKStringPatterns.class);
- FileInputFormat.addInputPath(job, new Path(input));
- Path outPath = new Path(new Path(params.get("output")), "fpgrowth");
+ FileInputFormat.addInputPath(job, input);
+ Path outPath = new Path(params.get(OUTPUT), FPGROWTH);
FileOutputFormat.setOutputPath(job, outPath);
HadoopUtil.overwriteOutput(outPath);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Mon Oct 4 22:49:02 2010
@@ -54,7 +54,7 @@ public class ParallelCountingMapper exte
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
- splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
+ Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
+ splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString()));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Mon Oct 4 22:49:02 2010
@@ -71,15 +71,15 @@ public class ParallelFPGrowthMapper exte
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
+ Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>();
int i = 0;
- for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+ for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST, context.getConfiguration())) {
fMap.put(e.getFirst(), i++);
}
- for (Entry<String,Long> e : PFPGrowth.deserializeMap(params, "gList", context.getConfiguration())
+ for (Entry<String,Long> e : PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, context.getConfiguration())
.entrySet()) {
gListInt.put(fMap.get(e.getKey()), e.getValue());
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Mon Oct 4 22:49:02 2010
@@ -109,16 +109,16 @@ public class ParallelFPGrowthReducer ext
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
+ Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
int i = 0;
- for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+ for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST, context.getConfiguration())) {
featureReverseMap.add(e.getFirst());
fMap.put(e.getFirst(), i++);
}
- Map<String,Long> gList = PFPGrowth.deserializeMap(params, "gList", context.getConfiguration());
+ Map<String,Long> gList = PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, context.getConfiguration());
for (Entry<String,Long> entry : gList.entrySet()) {
IntArrayList groupList = groupFeatures.get(entry.getValue());
@@ -132,9 +132,9 @@ public class ParallelFPGrowthReducer ext
}
}
- maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
- minSupport = Integer.valueOf(params.get("minSupport", "3"));
- FPTreeDepthCache.setFirstLevelCacheSize(Integer.valueOf(params.get("treeCacheSize", Integer
+ maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50"));
+ minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3"));
+ FPTreeDepthCache.setFirstLevelCacheSize(Integer.valueOf(params.get(PFPGrowth.TREE_CACHE_SIZE, Integer
.toString(FPTreeDepthCache.getFirstLevelCacheSize()))));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java Mon Oct 4 22:49:02 2010
@@ -71,14 +71,14 @@ public class TransactionSortingMapper ex
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
+ Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
int i = 0;
- for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+ for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST, context.getConfiguration())) {
fMap.put(e.getFirst(), i++);
}
- splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
+ splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString()));
}
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java Mon Oct 4 22:49:02 2010
@@ -40,19 +40,23 @@ public final class PFPGrowthTest extends
private static final Logger log = LoggerFactory.getLogger(PFPGrowthTest.class);
private final Parameters params = new Parameters();
+ private File input;
+ private File inputDir;
+ private File outputDir;
@Override
public void setUp() throws Exception {
super.setUp();
- params.set("minSupport", "3");
- params.set("maxHeapSize", "4");
- params.set("numGroups", "2");
- params.set("encoding", "UTF-8");
- params.set("treeCacheSize", "5");
- File inputDir = getTestTempDir("testdata/transactions");
- File input = new File(inputDir, "test.txt");
- params.set("input", input.getAbsolutePath());
- params.set("output", "output/frequentpatterns");
+ params.set(PFPGrowth.MIN_SUPPORT, "3");
+ params.set(PFPGrowth.MAX_HEAPSIZE, "4");
+ params.set(PFPGrowth.NUM_GROUPS, "2");
+ params.set(PFPGrowth.ENCODING, "UTF-8");
+ params.set(PFPGrowth.TREE_CACHE_SIZE, "5");
+ inputDir = getTestTempDir("transactions");
+ outputDir = getTestTempDir("frequentpatterns");
+ input = new File(inputDir, "test.txt");
+ params.set(PFPGrowth.INPUT, input.getAbsolutePath());
+ params.set(PFPGrowth.OUTPUT, outputDir.getAbsolutePath());
BufferedWriter writer = new BufferedWriter(new FileWriter(input));
try {
Collection<List<String>> transactions = new ArrayList<List<String>>();
@@ -79,31 +83,25 @@ public final class PFPGrowthTest extends
}
@Test
- public void testStartParallelCounting() throws Exception {
- log.info("Starting Parallel Counting Test: {}", params.get("maxHeapSize"));
+ public void testStartParallelFPGrowth() throws Exception {
+ log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
PFPGrowth.startParallelCounting(params);
- log.info("Reading fList Test: {}", params.get("maxHeapSize"));
+ log.info("Reading fList Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
log.info("{}", fList);
assertEquals("[(B,6), (D,6), (A,5), (E,4), (C,3)]", fList.toString());
- }
-
- @Test
- public void testStartGroupingItems() throws Exception {
- log.info("Starting Grouping Test: {}", params.get("maxHeapSize"));
+
+ log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
PFPGrowth.startGroupingItems(params);
- Map<String,Long> gList = PFPGrowth.deserializeMap(params, "gList", new Configuration());
+ Map<String,Long> gList = PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, new Configuration());
log.info("{}", gList);
assertEquals("{D=0, E=1, A=0, B=0, C=1}", gList.toString());
- }
-
- @Test
- public void testStartParallelFPGrowth() throws Exception {
- log.info("Starting Parallel FPGrowth Test: {}", params.get("maxHeapSize"));
+
+ log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
PFPGrowth.startGroupingItems(params);
PFPGrowth.startTransactionSorting(params);
PFPGrowth.startParallelFPGrowth(params);
- log.info("Starting Pattern Aggregation Test: {}", params.get("maxHeapSize"));
+ log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
PFPGrowth.startAggregating(params);
List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
@@ -113,5 +111,4 @@ public final class PFPGrowthTest extends
+ "(E,([A, E],4), ([D, A, E],3), ([B, A, E],3))]", frequentPatterns.toString());
}
-
}