You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2010/10/05 00:49:02 UTC

svn commit: r1004466 - in /mahout/trunk/core/src: main/java/org/apache/mahout/fpm/pfpgrowth/ test/java/org/apache/mahout/fpm/pfpgrowth/

Author: robinanil
Date: Mon Oct  4 22:49:02 2010
New Revision: 1004466

URL: http://svn.apache.org/viewvc?rev=1004466&view=rev
Log:
Make hardcoded paths and constants in PFPGrowth final variables and use temp dirs in test

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Mon Oct  4 22:49:02 2010
@@ -57,6 +57,24 @@ import org.slf4j.LoggerFactory;
  * 
  */
 public final class PFPGrowth {
+  
+  public static final String ENCODING = "encoding";
+  public static final String F_LIST = "fList";
+  public static final String G_LIST = "gList";
+  public static final String NUM_GROUPS = "numGroups";
+  public static final String OUTPUT = "output";
+  public static final String MIN_SUPPORT = "minSupport";
+  public static final String MAX_HEAPSIZE = "maxHeapSize";
+  public static final String INPUT = "input";
+  public static final String PFP_PARAMETERS = "pfp.parameters";
+  public static final String FILE_PATTERN = "part-*";
+  public static final String FPGROWTH = "fpgrowth";
+  public static final String FREQUENT_PATTERNS = "frequentpatterns";
+  public static final String PARALLEL_COUNTING = "parallelcounting";  
+  public static final String SORTED_OUTPUT = "sortedoutput";
+  public static final String SPLIT_PATTERN = "splitPattern";
+  public static final String TREE_CACHE_SIZE = "treeCacheSize";
+  
   public static final Pattern SPLITTER = Pattern.compile("[ ,\t]*[,|\t][ ,\t]*");
   
   private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
@@ -116,12 +134,12 @@ public final class PFPGrowth {
   public static List<Pair<String,Long>> readFList(Parameters params) throws IOException {
     Writable key = new Text();
     LongWritable value = new LongWritable();
-    int minSupport = Integer.valueOf(params.get("minSupport", "3"));
+    int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3"));
     Configuration conf = new Configuration();
-
-    Path parallelCountingPath = new Path(params.get("output"), "parallelcounting");
+      
+    Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
     FileSystem fs = FileSystem.get(parallelCountingPath.toUri(), conf);
-    FileStatus[] outputFiles = fs.globStatus(new Path(parallelCountingPath, "part-*"));
+    FileStatus[] outputFiles = fs.globStatus(new Path(parallelCountingPath, FILE_PATTERN));
     
     PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11,
         new Comparator<Pair<String,Long>>() {
@@ -162,9 +180,9 @@ public final class PFPGrowth {
     
     Configuration conf = new Configuration();
 
-    Path frequentPatternsPath = new Path(params.get("output"), "frequentPatterns");
+    Path frequentPatternsPath = new Path(params.get(OUTPUT), FREQUENT_PATTERNS);
     FileSystem fs = FileSystem.get(frequentPatternsPath.toUri(), conf);
-    FileStatus[] outputFiles = fs.globStatus(new Path(frequentPatternsPath, "part-*"));
+    FileStatus[] outputFiles = fs.globStatus(new Path(frequentPatternsPath, FILE_PATTERN));
     
     List<Pair<String,TopKStringPatterns>> ret = new ArrayList<Pair<String,TopKStringPatterns>>();
     for (FileStatus fileStatus : outputFiles) {
@@ -199,21 +217,21 @@ public final class PFPGrowth {
                                                         ClassNotFoundException {
     
     Configuration conf = new Configuration();
-    params.set("fList", "");
-    params.set("gList", "");
-    conf.set("pfp.parameters", params.toString());
+    params.set(F_LIST, "");
+    params.set(G_LIST, "");
+    conf.set(PFP_PARAMETERS, params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
     
-    String input = params.get("output") + "/fpgrowth";
+    Path input = new Path(params.get(OUTPUT), FPGROWTH);
     Job job = new Job(conf, "PFP Aggregator Driver running over input: " + input);
     job.setJarByClass(PFPGrowth.class);
     
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TopKStringPatterns.class);
     
-    FileInputFormat.addInputPath(job, new Path(input));
-    Path outPath = new Path(params.get("output"), "frequentPatterns");
+    FileInputFormat.addInputPath(job, input);
+    Path outPath = new Path(params.get(OUTPUT), FREQUENT_PATTERNS);
     FileOutputFormat.setOutputPath(job, outPath);
     
     job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -235,7 +253,7 @@ public final class PFPGrowth {
   public static void startGroupingItems(Parameters params) throws IOException {
     Configuration conf = new Configuration();
     List<Pair<String,Long>> fList = readFList(params);
-    Integer numGroups = Integer.valueOf(params.get("numGroups", "50"));
+    Integer numGroups = Integer.valueOf(params.get(NUM_GROUPS, "50"));
     
     Map<String,Long> gList = new HashMap<String,Long>();
     long maxPerGroup = fList.size() / numGroups;
@@ -258,8 +276,8 @@ public final class PFPGrowth {
     
     log.info("No of Features: {}", fList.size());
     
-    params.set("gList", serializeMap(gList, conf));
-    params.set("fList", serializeList(fList, conf));
+    params.set(G_LIST, serializeMap(gList, conf));
+    params.set(F_LIST, serializeList(fList, conf));
   }
   
   /**
@@ -270,12 +288,12 @@ public final class PFPGrowth {
                                                              ClassNotFoundException {
     
     Configuration conf = new Configuration();
-    conf.set("pfp.parameters", params.toString());
+    conf.set(PFP_PARAMETERS, params.toString());
     
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
     
-    String input = params.get("input");
+    String input = params.get(INPUT);
     Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);
     job.setJarByClass(PFPGrowth.class);
     
@@ -283,7 +301,7 @@ public final class PFPGrowth {
     job.setOutputValueClass(LongWritable.class);
     
     FileInputFormat.addInputPath(job, new Path(input));
-    Path outPath = new Path(params.get("output"), "parallelcounting");
+    Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
     FileOutputFormat.setOutputPath(job, outPath);
     
     HadoopUtil.overwriteOutput(outPath);
@@ -306,12 +324,12 @@ public final class PFPGrowth {
                                                                ClassNotFoundException {
     
     Configuration conf = new Configuration();
-    String gList = params.get("gList");
-    params.set("gList", "");
-    conf.set("pfp.parameters", params.toString());
+    String gList = params.get(G_LIST);
+    params.set(G_LIST, "");
+    conf.set(PFP_PARAMETERS, params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
-    String input = params.get("input");
+    String input = params.get(INPUT);
     Job job = new Job(conf, "PFP Transaction Sorting running over input" + input);
     job.setJarByClass(PFPGrowth.class);
     
@@ -322,7 +340,7 @@ public final class PFPGrowth {
     job.setOutputValueClass(TransactionTree.class);
     
     FileInputFormat.addInputPath(job, new Path(input));
-    Path outPath = new Path(params.get("output") + "/sortedoutput");
+    Path outPath = new Path(params.get(OUTPUT), SORTED_OUTPUT);
     FileOutputFormat.setOutputPath(job, outPath);
     
     HadoopUtil.overwriteOutput(outPath);
@@ -333,7 +351,7 @@ public final class PFPGrowth {
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     
     job.waitForCompletion(true);
-    params.set("gList", gList);
+    params.set(G_LIST, gList);
   }
   
   /**
@@ -344,10 +362,10 @@ public final class PFPGrowth {
                                                              ClassNotFoundException {
     
     Configuration conf = new Configuration();
-    conf.set("pfp.parameters", params.toString());
+    conf.set(PFP_PARAMETERS, params.toString());
     conf.set("mapred.compress.map.output", "true");
     conf.set("mapred.output.compression.type", "BLOCK");
-    String input = params.get("output") + "/sortedoutput";
+    Path input = new Path(params.get(OUTPUT), SORTED_OUTPUT);
     Job job = new Job(conf, "PFP Growth Driver running over input" + input);
     job.setJarByClass(PFPGrowth.class);
     
@@ -357,8 +375,8 @@ public final class PFPGrowth {
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TopKStringPatterns.class);
     
-    FileInputFormat.addInputPath(job, new Path(input));
-    Path outPath = new Path(new Path(params.get("output")), "fpgrowth");
+    FileInputFormat.addInputPath(job, input);
+    Path outPath = new Path(params.get(OUTPUT), FPGROWTH);
     FileOutputFormat.setOutputPath(job, outPath);
     
     HadoopUtil.overwriteOutput(outPath);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelCountingMapper.java Mon Oct  4 22:49:02 2010
@@ -54,7 +54,7 @@ public class ParallelCountingMapper exte
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
-    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
+    Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
+    splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString()));
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java Mon Oct  4 22:49:02 2010
@@ -71,15 +71,15 @@ public class ParallelFPGrowthMapper exte
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
+    Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
     
     OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>();
     int i = 0;
-    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST, context.getConfiguration())) {
       fMap.put(e.getFirst(), i++);
     }
     
-    for (Entry<String,Long> e : PFPGrowth.deserializeMap(params, "gList", context.getConfiguration())
+    for (Entry<String,Long> e : PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, context.getConfiguration())
         .entrySet()) {
       gListInt.put(fMap.get(e.getKey()), e.getValue());
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java Mon Oct  4 22:49:02 2010
@@ -109,16 +109,16 @@ public class ParallelFPGrowthReducer ext
   protected void setup(Context context) throws IOException, InterruptedException {
     
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
+    Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
     
     int i = 0;
-    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST, context.getConfiguration())) {
       featureReverseMap.add(e.getFirst());
       fMap.put(e.getFirst(), i++);
       
     }
     
-    Map<String,Long> gList = PFPGrowth.deserializeMap(params, "gList", context.getConfiguration());
+    Map<String,Long> gList = PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, context.getConfiguration());
     
     for (Entry<String,Long> entry : gList.entrySet()) {
       IntArrayList groupList = groupFeatures.get(entry.getValue());
@@ -132,9 +132,9 @@ public class ParallelFPGrowthReducer ext
       }
       
     }
-    maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
-    minSupport = Integer.valueOf(params.get("minSupport", "3"));
-    FPTreeDepthCache.setFirstLevelCacheSize(Integer.valueOf(params.get("treeCacheSize", Integer
+    maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50"));
+    minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3"));
+    FPTreeDepthCache.setFirstLevelCacheSize(Integer.valueOf(params.get(PFPGrowth.TREE_CACHE_SIZE, Integer
         .toString(FPTreeDepthCache.getFirstLevelCacheSize()))));
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java Mon Oct  4 22:49:02 2010
@@ -71,14 +71,14 @@ public class TransactionSortingMapper ex
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
-    Parameters params = Parameters.fromString(context.getConfiguration().get("pfp.parameters", ""));
+    Parameters params = Parameters.fromString(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
     
     int i = 0;
-    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, "fList", context.getConfiguration())) {
+    for (Pair<String,Long> e : PFPGrowth.deserializeList(params, PFPGrowth.F_LIST, context.getConfiguration())) {
       fMap.put(e.getFirst(), i++);
     }
     
-    splitter = Pattern.compile(params.get("splitPattern", PFPGrowth.SPLITTER.toString()));
+    splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString()));
     
   }
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java?rev=1004466&r1=1004465&r2=1004466&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java Mon Oct  4 22:49:02 2010
@@ -40,19 +40,23 @@ public final class PFPGrowthTest extends
   private static final Logger log = LoggerFactory.getLogger(PFPGrowthTest.class);
   
   private final Parameters params = new Parameters();
+  private File input;
+  private File inputDir;
+  private File outputDir;
   
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    params.set("minSupport", "3");
-    params.set("maxHeapSize", "4");
-    params.set("numGroups", "2");
-    params.set("encoding", "UTF-8");
-    params.set("treeCacheSize", "5");
-    File inputDir = getTestTempDir("testdata/transactions");
-    File input = new File(inputDir, "test.txt");
-    params.set("input", input.getAbsolutePath());
-    params.set("output", "output/frequentpatterns");
+    params.set(PFPGrowth.MIN_SUPPORT, "3");
+    params.set(PFPGrowth.MAX_HEAPSIZE, "4");
+    params.set(PFPGrowth.NUM_GROUPS, "2");
+    params.set(PFPGrowth.ENCODING, "UTF-8");
+    params.set(PFPGrowth.TREE_CACHE_SIZE, "5");
+    inputDir = getTestTempDir("transactions");
+    outputDir = getTestTempDir("frequentpatterns");
+    input = new File(inputDir, "test.txt");
+    params.set(PFPGrowth.INPUT, input.getAbsolutePath());
+    params.set(PFPGrowth.OUTPUT, outputDir.getAbsolutePath());
     BufferedWriter writer = new BufferedWriter(new FileWriter(input));
     try {
       Collection<List<String>> transactions = new ArrayList<List<String>>();
@@ -79,31 +83,25 @@ public final class PFPGrowthTest extends
   }
 
   @Test
-  public void testStartParallelCounting() throws Exception {
-    log.info("Starting Parallel Counting Test: {}", params.get("maxHeapSize"));
+  public void testStartParallelFPGrowth() throws Exception {
+    log.info("Starting Parallel Counting Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startParallelCounting(params);
-    log.info("Reading fList Test: {}", params.get("maxHeapSize"));
+    log.info("Reading fList Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
     log.info("{}", fList);
     assertEquals("[(B,6), (D,6), (A,5), (E,4), (C,3)]", fList.toString());
-  }
-
-  @Test
-  public void testStartGroupingItems() throws Exception {
-    log.info("Starting Grouping Test: {}", params.get("maxHeapSize"));
+ 
+    log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startGroupingItems(params);
-    Map<String,Long> gList = PFPGrowth.deserializeMap(params, "gList", new Configuration());
+    Map<String,Long> gList = PFPGrowth.deserializeMap(params, PFPGrowth.G_LIST, new Configuration());
     log.info("{}", gList);
     assertEquals("{D=0, E=1, A=0, B=0, C=1}", gList.toString());
-  }
-
-  @Test
-  public void testStartParallelFPGrowth() throws Exception {
-    log.info("Starting Parallel FPGrowth Test: {}", params.get("maxHeapSize"));
+ 
+    log.info("Starting Parallel FPGrowth Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startGroupingItems(params);
     PFPGrowth.startTransactionSorting(params);
     PFPGrowth.startParallelFPGrowth(params);
-    log.info("Starting Pattern Aggregation Test: {}", params.get("maxHeapSize"));
+    log.info("Starting Pattern Aggregation Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
     PFPGrowth.startAggregating(params);
     List<Pair<String,TopKStringPatterns>> frequentPatterns = PFPGrowth.readFrequentPattern(params);
     assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
@@ -113,5 +111,4 @@ public final class PFPGrowthTest extends
                  + "(E,([A, E],4), ([D, A, E],3), ([B, A, E],3))]", frequentPatterns.toString());
     
   }
-  
 }