You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/03/13 09:16:01 UTC

svn commit: r1455833 - /mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java

Author: ssc
Date: Wed Mar 13 08:16:01 2013
New Revision: 1455833

URL: http://svn.apache.org/r1455833
Log:
MAHOUT-1074 FPGrowthDriver only supports input from local file

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java?rev=1455833&r1=1455832&r2=1455833&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java Wed Mar 13 08:16:01 2013
@@ -17,8 +17,10 @@
 
 package org.apache.mahout.fpm.pfpgrowth;
 
+import com.google.common.collect.Sets;
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -144,29 +146,34 @@ public final class FPGrowthDriver extend
     int maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
     int minSupport = Integer.valueOf(params.get("minSupport", "3"));
 
-    String output = params.get("output", "output.txt");
+    Path output = new Path(params.get("output", "output.txt"));
+    Path input = new Path(params.get("input"));
 
-    Path path = new Path(output);
     Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
 
     Charset encoding = Charset.forName(params.get("encoding"));
-    String input = params.get("input");
 
     String pattern = params.get("splitPattern", PFPGrowth.SPLITTER.toString());
 
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, TopKStringPatterns.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, Text.class, TopKStringPatterns.class);
+
+    FSDataInputStream inputStream = null;
+    FSDataInputStream inputStreamAgain = null;
+
+    Collection<String> features = Sets.newHashSet();
 
     if ("true".equals(params.get(PFPGrowth.USE_FPG2))) {
       org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj<String> fp 
         = new org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj<String>();
-      Collection<String> features = new HashSet<String>();
 
       try {
+        inputStream = fs.open(input);
+        inputStreamAgain = fs.open(input);
         fp.generateTopKFrequentPatterns(
-                new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+                new StringRecordIterator(new FileLineIterable(inputStream, encoding, false), pattern),
                 fp.generateFList(
-                        new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+                        new StringRecordIterator(new FileLineIterable(inputStreamAgain, encoding, false), pattern),
                         minSupport),
                 minSupport,
                 maxHeapSize,
@@ -175,15 +182,20 @@ public final class FPGrowthDriver extend
                 new ContextStatusUpdater(null));
       } finally {
         Closeables.closeQuietly(writer);
+        Closeables.closeQuietly(inputStream);
+        Closeables.closeQuietly(inputStreamAgain);
       }
     } else {
       FPGrowth<String> fp = new FPGrowth<String>();
-      Collection<String> features = new HashSet<String>();
+
+
+      inputStream = fs.open(input);
+      inputStreamAgain = fs.open(input);
       try {
         fp.generateTopKFrequentPatterns(
-                new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+                new StringRecordIterator(new FileLineIterable(inputStream, encoding, false), pattern),
                 fp.generateFList(
-                        new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+                        new StringRecordIterator(new FileLineIterable(inputStreamAgain, encoding, false), pattern),
                         minSupport),
                 minSupport,
                 maxHeapSize,
@@ -192,11 +204,12 @@ public final class FPGrowthDriver extend
                 new ContextStatusUpdater(null));
       } finally {
         Closeables.closeQuietly(writer);
+        Closeables.closeQuietly(inputStream);
+        Closeables.closeQuietly(inputStreamAgain);
       }
-    } 
-
+    }
 
-    List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowth.readFrequentPattern(conf, path);
+    List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowth.readFrequentPattern(conf, output);
     for (Pair<String, TopKStringPatterns> entry : frequentPatterns) {
       log.info("Dumping Patterns for Feature: {} \n{}", entry.getFirst(), entry.getSecond());
     }