You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/03/13 09:16:01 UTC
svn commit: r1455833 -
/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java
Author: ssc
Date: Wed Mar 13 08:16:01 2013
New Revision: 1455833
URL: http://svn.apache.org/r1455833
Log:
MAHOUT-1074 FPGrowthDriver only supports input from local file
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java?rev=1455833&r1=1455832&r2=1455833&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java Wed Mar 13 08:16:01 2013
@@ -17,8 +17,10 @@
package org.apache.mahout.fpm.pfpgrowth;
+import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -144,29 +146,34 @@ public final class FPGrowthDriver extend
int maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50"));
int minSupport = Integer.valueOf(params.get("minSupport", "3"));
- String output = params.get("output", "output.txt");
+ Path output = new Path(params.get("output", "output.txt"));
+ Path input = new Path(params.get("input"));
- Path path = new Path(output);
Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(path.toUri(), conf);
+ FileSystem fs = FileSystem.get(output.toUri(), conf);
Charset encoding = Charset.forName(params.get("encoding"));
- String input = params.get("input");
String pattern = params.get("splitPattern", PFPGrowth.SPLITTER.toString());
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, TopKStringPatterns.class);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, Text.class, TopKStringPatterns.class);
+
+ FSDataInputStream inputStream = null;
+ FSDataInputStream inputStreamAgain = null;
+
+ Collection<String> features = Sets.newHashSet();
if ("true".equals(params.get(PFPGrowth.USE_FPG2))) {
org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj<String> fp
= new org.apache.mahout.fpm.pfpgrowth.fpgrowth2.FPGrowthObj<String>();
- Collection<String> features = new HashSet<String>();
try {
+ inputStream = fs.open(input);
+ inputStreamAgain = fs.open(input);
fp.generateTopKFrequentPatterns(
- new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+ new StringRecordIterator(new FileLineIterable(inputStream, encoding, false), pattern),
fp.generateFList(
- new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+ new StringRecordIterator(new FileLineIterable(inputStreamAgain, encoding, false), pattern),
minSupport),
minSupport,
maxHeapSize,
@@ -175,15 +182,20 @@ public final class FPGrowthDriver extend
new ContextStatusUpdater(null));
} finally {
Closeables.closeQuietly(writer);
+ Closeables.closeQuietly(inputStream);
+ Closeables.closeQuietly(inputStreamAgain);
}
} else {
FPGrowth<String> fp = new FPGrowth<String>();
- Collection<String> features = new HashSet<String>();
+
+
+ inputStream = fs.open(input);
+ inputStreamAgain = fs.open(input);
try {
fp.generateTopKFrequentPatterns(
- new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+ new StringRecordIterator(new FileLineIterable(inputStream, encoding, false), pattern),
fp.generateFList(
- new StringRecordIterator(new FileLineIterable(new File(input), encoding, false), pattern),
+ new StringRecordIterator(new FileLineIterable(inputStreamAgain, encoding, false), pattern),
minSupport),
minSupport,
maxHeapSize,
@@ -192,11 +204,12 @@ public final class FPGrowthDriver extend
new ContextStatusUpdater(null));
} finally {
Closeables.closeQuietly(writer);
+ Closeables.closeQuietly(inputStream);
+ Closeables.closeQuietly(inputStreamAgain);
}
- }
-
+ }
- List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowth.readFrequentPattern(conf, path);
+ List<Pair<String, TopKStringPatterns>> frequentPatterns = FPGrowth.readFrequentPattern(conf, output);
for (Pair<String, TopKStringPatterns> entry : frequentPatterns) {
log.info("Dumping Patterns for Feature: {} \n{}", entry.getFirst(), entry.getSecond());
}