You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2013/06/09 14:17:45 UTC
svn commit: r1491191 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/
core/src/main/java/org/apache/mahout/vectorizer/
core/src/main/java/org/apache/mahout/vectorizer/term/
core/src/main/java/org/apache/mahout/...
Author: gsingers
Date: Sun Jun 9 12:17:45 2013
New Revision: 1491191
URL: http://svn.apache.org/r1491191
Log:
MAHOUT-1103: properly partition the data for MapReduce
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
mahout/trunk/examples/bin/cluster-reuters.sh
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java Sun Jun 9 12:17:45 2013
@@ -17,18 +17,21 @@
package org.apache.mahout.clustering.topdown.postprocessor;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
/**
* Reads the number of clusters produced by the clustering algorithm.
*/
@@ -39,11 +42,9 @@ public final class ClusterCountReader {
/**
* Reads the number of clusters present by reading the clusters-*-final file.
- *
- * @param clusterOutputPath
- * The output path provided to the clustering algorithm.
- * @param conf
- * The hadoop configuration.
+ *
+ * @param clusterOutputPath The output path provided to the clustering algorithm.
+ * @param conf The hadoop configuration.
* @return the number of final clusters.
*/
public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException {
@@ -51,11 +52,11 @@ public final class ClusterCountReader {
FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
int numberOfClusters = 0;
Iterator<?> it = new SequenceFileDirValueIterator<Writable>(clusterFiles[0].getPath(),
- PathType.LIST,
- PathFilters.partFilter(),
- null,
- true,
- conf);
+ PathType.LIST,
+ PathFilters.partFilter(),
+ null,
+ true,
+ conf);
while (it.hasNext()) {
it.next();
numberOfClusters++;
@@ -63,4 +64,38 @@ public final class ClusterCountReader {
return numberOfClusters;
}
+ /**
+ * Generates a list of all cluster ids by reading the clusters-*-final file.
+ *
+ * @param clusterOutputPath The output path provided to the clustering algorithm.
+ * @param conf The hadoop configuration.
+ * @return An ArrayList containing the final cluster ids.
+ */
+ public static Map<Integer, Integer> getClusterIDs(Path clusterOutputPath, Configuration conf, boolean keyIsClusterId) throws IOException {
+ Map<Integer, Integer> clusterIds = new HashMap<Integer, Integer>();
+ FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+ FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+ //System.out.println("LOOK HERE: " + clusterOutputPath);
+ Iterator<ClusterWritable> it = new SequenceFileDirValueIterator<ClusterWritable>(clusterFiles[0].getPath(),
+ PathType.LIST,
+ PathFilters.partFilter(),
+ null,
+ true,
+ conf);
+ int i = 0;
+ while (it.hasNext()) {
+ Integer key, value;
+ if (keyIsClusterId == true) { // key is the cluster id, value is i, the index we will use
+ key = it.next().getValue().getId();
+ value = i;
+ } else {
+ key = i;
+ value = it.next().getValue().getId();
+ }
+ clusterIds.put(key, value);
+ i++;
+ }
+ return clusterIds;
+ }
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java Sun Jun 9 12:17:45 2013
@@ -17,10 +17,6 @@
package org.apache.mahout.clustering.topdown.postprocessor;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,77 +33,80 @@ import org.apache.mahout.common.iterator
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.apache.mahout.math.VectorWritable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* This class reads the output of any clustering algorithm, and, creates separate directories for different
* clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster
* directory associated with that point.
- *
+ * <p/>
* This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered
* sequentially.
- *
+ * <p/>
* The sequential and non sequential version, both are being used from {@link ClusterOutputPostProcessorDriver}.
*/
public final class ClusterOutputPostProcessor {
-
+
private Path clusteredPoints;
private final FileSystem fileSystem;
private final Configuration conf;
private final Path clusterPostProcessorOutput;
- private final Map<String,Path> postProcessedClusterDirectories = new HashMap<String,Path>();
+ private final Map<String, Path> postProcessedClusterDirectories = new HashMap<String, Path>();
private long uniqueVectorId = 0L;
- private final Map<String,SequenceFile.Writer> writersForClusters;
-
+ private final Map<String, SequenceFile.Writer> writersForClusters;
+
public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed,
Path output,
Configuration hadoopConfiguration) throws IOException {
this.clusterPostProcessorOutput = output;
this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed);
this.conf = hadoopConfiguration;
- this.writersForClusters = new HashMap<String,SequenceFile.Writer>();
- fileSystem = clusteredPoints.getFileSystem(conf);
+ this.writersForClusters = new HashMap<String, SequenceFile.Writer>();
+ fileSystem = clusteredPoints.getFileSystem(conf);
}
-
+
/**
* This method takes the clustered points output by the clustering algorithms as input and writes them into
* their respective clusters.
*/
public void process() throws IOException {
createPostProcessDirectory();
- for (Pair<?,WeightedVectorWritable> record
- : new SequenceFileDirIterable<Writable,WeightedVectorWritable>(clusteredPoints,
- PathType.GLOB,
- PathFilters.partFilter(),
- null,
- false,
- conf)) {
+ for (Pair<?, WeightedVectorWritable> record
+ : new SequenceFileDirIterable<Writable, WeightedVectorWritable>(clusteredPoints,
+ PathType.GLOB,
+ PathFilters.partFilter(),
+ null,
+ false,
+ conf)) {
String clusterId = record.getFirst().toString().trim();
putVectorInRespectiveCluster(clusterId, record.getSecond());
}
IOUtils.close(writersForClusters.values());
writersForClusters.clear();
}
-
+
/**
* Creates the directory to put post processed clusters.
*/
private void createPostProcessDirectory() throws IOException {
if (!fileSystem.exists(clusterPostProcessorOutput)
- && !fileSystem.mkdirs(clusterPostProcessorOutput)) {
+ && !fileSystem.mkdirs(clusterPostProcessorOutput)) {
throw new IOException("Error creating cluster post processor directory");
}
}
-
+
/**
- *
* Finds out the cluster directory of the vector and writes it into the specified cluster.
*/
private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException {
Writer writer = findWriterForVector(clusterId);
postProcessedClusterDirectories.put(clusterId,
- PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
+ PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
writeVectorToCluster(writer, point);
}
-
+
/**
* Finds out the path in cluster where the point is supposed to be written.
*/
@@ -121,7 +120,7 @@ public final class ClusterOutputPostProc
}
return writer;
}
-
+
/**
* Writes vector to the cluster directory.
*/
@@ -129,16 +128,16 @@ public final class ClusterOutputPostProc
writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector()));
writer.sync();
}
-
+
/**
* @return the set of all post processed cluster paths.
*/
- public Map<String,Path> getPostProcessedClusterDirectories() {
+ public Map<String, Path> getPostProcessedClusterDirectories() {
return postProcessedClusterDirectories;
}
-
+
public void setClusteredPoints(Path clusteredPoints) {
this.clusteredPoints = clusteredPoints;
}
-
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java Sun Jun 9 12:17:45 2013
@@ -17,13 +17,11 @@
package org.apache.mahout.clustering.topdown.postprocessor;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -32,71 +30,74 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
import org.apache.mahout.math.VectorWritable;
+import java.io.IOException;
+
/**
* Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be
* used for top down clustering. It can also be used if the clustering output needs to be grouped into their
* respective clusters.
*/
public final class ClusterOutputPostProcessorDriver extends AbstractJob {
-
+
/**
* CLI to run clustering post processor. The input to post processor is the ouput path specified to the
* clustering.
*/
@Override
public int run(String[] args) throws Exception {
-
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.methodOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
if (parseArguments(args) == null) {
return -1;
}
-
Path input = getInputPath();
Path output = getOutputPath();
if (getConf() == null) {
setConf(new Configuration());
}
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
- DefaultOptionCreator.SEQUENTIAL_METHOD);
+ DefaultOptionCreator.SEQUENTIAL_METHOD);
run(input, output, runSequential);
return 0;
-
+
}
-
+
/**
* Constructor to be used by the ToolRunner.
*/
- private ClusterOutputPostProcessorDriver() {}
-
+ private ClusterOutputPostProcessorDriver() {
+ }
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args);
}
-
+
/**
* Post processes the output of clustering algorithms and groups them into respective clusters. Each
* cluster's vectors are written into a directory named after its clusterId.
- *
- * @param input
- * The output path provided to the clustering algorithm, whose would be post processed. Hint : The
- * path of the directory containing clusters-*-final and clusteredPoints.
- * @param output
- * The post processed data would be stored at this path.
- * @param runSequential
- * If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering
- * was done sequentially, make it sequential, else vice versa.
+ *
+ * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+ * path of the directory containing clusters-*-final and clusteredPoints.
+ * @param output The post processed data would be stored at this path.
+ * @param runSequential If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering
+ * was done sequentially, make it sequential, else vice versa.
*/
public static void run(Path input, Path output, boolean runSequential) throws IOException,
- InterruptedException,
- ClassNotFoundException {
+ InterruptedException,
+ ClassNotFoundException {
if (runSequential) {
postProcessSeq(input, output);
} else {
@@ -104,81 +105,76 @@ public final class ClusterOutputPostProc
postProcessMR(conf, input, output);
movePartFilesToRespectiveDirectories(conf, output);
}
-
+
}
-
+
/**
* Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after
* their clusterId.
- *
- * @param input
- * The output path provided to the clustering algorithm, whose would be post processed. Hint : The
- * path of the directory containing clusters-*-final and clusteredPoints.
- * @param output
- * The post processed data would be stored at this path.
+ *
+ * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+ * path of the directory containing clusters-*-final and clusteredPoints.
+ * @param output The post processed data would be stored at this path.
*/
private static void postProcessSeq(Path input, Path output) throws IOException {
ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output,
- new Configuration());
+ new Configuration());
clusterOutputPostProcessor.process();
}
-
+
/**
* Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the
* output. So that each cluster's vector is written in its own part file.
- *
- * @param conf
- * The hadoop configuration.
- * @param input
- * The output path provided to the clustering algorithm, whose would be post processed. Hint : The
- * path of the directory containing clusters-*-final and clusteredPoints.
- * @param output
- * The post processed data would be stored at this path.
+ *
+ * @param conf The hadoop configuration.
+ * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+ * path of the directory containing clusters-*-final and clusteredPoints.
+ * @param output The post processed data would be stored at this path.
*/
private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException,
- InterruptedException,
- ClassNotFoundException {
+ InterruptedException,
+ ClassNotFoundException {
+ System.out.println("WARNING: If you are running in Hadoop local mode, please use the --sequential option, as the MapReduce option will not work properly");
+ int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
+ conf.set("clusterOutputPath", input.toString());
Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(ClusterOutputPostProcessorMapper.class);
- job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setReducerClass(ClusterOutputPostProcessorReducer.class);
- job.setOutputKeyClass(Text.class);
+ job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(VectorWritable.class);
- int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
job.setNumReduceTasks(numberOfClusters);
job.setJarByClass(ClusterOutputPostProcessorDriver.class);
-
+
FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints")));
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) {
throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input);
}
}
-
+
/**
* The mapreduce version of the post processor writes different clusters into different part files. This
* method reads the part files and moves them into directories named after their clusterIds.
- *
- * @param conf
- * The hadoop configuration.
- * @param output
- * The post processed data would be stored at this path.
+ *
+ * @param conf The hadoop configuration.
+ * @param output The post processed data would be stored at this path.
*/
private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException {
FileSystem fileSystem = output.getFileSystem(conf);
for (FileStatus fileStatus : fileSystem.listStatus(output, PathFilters.partFilter())) {
- SequenceFileIterator<Writable,Writable> it =
- new SequenceFileIterator<Writable,Writable>(fileStatus.getPath(), true, conf);
+ SequenceFileIterator<Writable, Writable> it =
+ new SequenceFileIterator<Writable, Writable>(fileStatus.getPath(), true, conf);
if (it.hasNext()) {
renameFile(it.next().getFirst(), fileStatus, conf);
}
it.close();
}
}
-
+
/**
* Using @FileSystem rename method to move the file.
*/
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java Sun Jun 9 12:17:45 2013
@@ -17,26 +17,41 @@
package org.apache.mahout.clustering.topdown.postprocessor;
-import java.io.IOException;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.clustering.classify.WeightedVectorWritable;
import org.apache.mahout.math.VectorWritable;
+import java.io.IOException;
+import java.util.Map;
+
/**
* Mapper for post processing cluster output.
*/
public class ClusterOutputPostProcessorMapper extends
- Mapper<IntWritable,WeightedVectorWritable,Text,VectorWritable> {
-
- /**
- * The key is the cluster id and the value is the vector.
- */
+ Mapper<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+
+ private Map<Integer, Integer> newClusterMappings;
+ private VectorWritable outputVector;
+
+ //read the current cluster ids, and populate the cluster mapping hash table
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ //this give the clusters-x-final directory where the cluster ids can be read
+ Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+ //we want the key to be the cluster id, the value to be the index
+ newClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, true);
+ outputVector = new VectorWritable();
+ }
+
@Override
- protected void map(IntWritable key, WeightedVectorWritable vector, Context context) throws IOException,
- InterruptedException {
- context.write(new Text(key.toString().trim()), new VectorWritable(vector.getVector()));
+ public void map(IntWritable key, WeightedVectorWritable val, Context context) throws IOException, InterruptedException {
+ //by pivoting on the cluster mapping value, we can make sure that each unique cluster goes to it's own reducer, since they
+ //are numbered from 0 to k-1, where k is the number of clusters
+ outputVector.set(val.getVector());
+ context.write(new IntWritable(newClusterMappings.get(key.get())), outputVector);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java Sun Jun 9 12:17:45 2013
@@ -17,25 +17,46 @@
package org.apache.mahout.clustering.topdown.postprocessor;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VectorWritable;
+import java.io.IOException;
+import java.util.Map;
+
/**
* Reducer for post processing cluster output.
*/
-public class ClusterOutputPostProcessorReducer extends Reducer<Text,VectorWritable,Text,VectorWritable> {
+public class ClusterOutputPostProcessorReducer extends Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+
+ private Map<Integer, Integer> reverseClusterMappings;
+
+ //read the current cluster ids, and populate the hash cluster mapping hash table
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+ //we want to the key to be the index, the value to be the cluster id
+ reverseClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, false);
+ }
+
/**
- * The key is the cluster id and the values contains the points in that cluster.
+ * The key is the remapped cluster id and the values contains the vectors in that cluster.
*/
@Override
- protected void reduce(Text key, Iterable<VectorWritable> values, Context context) throws IOException,
- InterruptedException {
+ protected void reduce(IntWritable key, Iterable<VectorWritable> values, Context context) throws IOException,
+ InterruptedException {
+ //remap the cluster back to its original id
+ //and then output the vectors with their correct
+ //cluster id.
+ IntWritable outKey = new IntWritable(reverseClusterMappings.get(key.get()));
+ System.out.println(outKey + " this: " + this);
for (VectorWritable value : values) {
- context.write(key, value);
+ context.write(outKey, value);
}
}
-
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java Sun Jun 9 12:17:45 2013
@@ -52,6 +52,8 @@ import org.apache.mahout.vectorizer.term
import org.apache.mahout.vectorizer.term.TermCountCombiner;
import org.apache.mahout.vectorizer.term.TermCountMapper;
import org.apache.mahout.vectorizer.term.TermCountReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class converts a set of input documents in the sequence file format to vectors. The Sequence file
@@ -60,6 +62,7 @@ import org.apache.mahout.vectorizer.term
* This is a dictionary based Vectorizer.
*/
public final class DictionaryVectorizer implements Vectorizer {
+ private static Logger log = LoggerFactory.getLogger(DictionaryVectorizer.class);
public static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tf-vectors";
public static final String MIN_SUPPORT = "min.support";
@@ -167,6 +170,7 @@ public final class DictionaryVectorizer
int[] maxTermDimension = new int[1];
List<Path> dictionaryChunks;
+ log.info("Creating dictionary from {} and saving at {}", input, dictionaryJobPath);
if (maxNGramSize == 1) {
startWordCounting(input, dictionaryJobPath, baseConf, minSupport);
dictionaryChunks =
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java Sun Jun 9 12:17:45 2013
@@ -17,8 +17,6 @@
package org.apache.mahout.vectorizer;
-import java.util.List;
-
import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
@@ -45,121 +43,123 @@ import org.apache.mahout.vectorizer.tfid
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* Converts a given set of sequence files into SparseVectors
*/
public final class SparseVectorsFromSequenceFiles extends AbstractJob {
-
+
private static final Logger log = LoggerFactory.getLogger(SparseVectorsFromSequenceFiles.class);
-
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new SparseVectorsFromSequenceFiles(), args);
}
-
+
@Override
public int run(String[] args) throws Exception {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
-
+
Option inputDirOpt = DefaultOptionCreator.inputOption().create();
-
+
Option outputDirOpt = DefaultOptionCreator.outputOption().create();
-
+
Option minSupportOpt = obuilder.withLongName("minSupport").withArgument(
- abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription(
- "(Optional) Minimum Support. Default Value: 2").withShortName("s").create();
-
+ abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional) Minimum Support. Default Value: 2").withShortName("s").create();
+
Option analyzerNameOpt = obuilder.withLongName("analyzerName").withArgument(
- abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription(
- "The class name of the analyzer").withShortName("a").create();
-
+ abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The class name of the analyzer").withShortName("a").create();
+
Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
- abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
- "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create();
-
+ abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create();
+
Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument(
- abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
- "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create();
-
+ abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create();
+
Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument(
- abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
- "The minimum document frequency. Default is 1").withShortName("md").create();
+ abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The minimum document frequency. Default is 1").withShortName("md").create();
Option maxDFPercentOpt = obuilder.withLongName("maxDFPercent").withRequired(false).withArgument(
- abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription(
- "The max percentage of docs for the DF. Can be used to remove really high frequency terms."
- + " Expressed as an integer between 0 and 100. Default is 99. If maxDFSigma is also set, "
- + "it will override this value.").withShortName("x").create();
+ abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The max percentage of docs for the DF. Can be used to remove really high frequency terms."
+ + " Expressed as an integer between 0 and 100. Default is 99. If maxDFSigma is also set, "
+ + "it will override this value.").withShortName("x").create();
Option maxDFSigmaOpt = obuilder.withLongName("maxDFSigma").withRequired(false).withArgument(
- abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription(
- "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) "
- + "of the document frequencies of these vectors. Can be used to remove really high frequency terms."
- + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less than 0 "
- + "no vectors will be filtered out. Default is -1.0. Overrides maxDFPercent").withShortName("xs").create();
-
+ abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription(
+ "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) "
+ + "of the document frequencies of these vectors. Can be used to remove really high frequency terms."
+ + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less than 0 "
+ + "no vectors will be filtered out. Default is -1.0. Overrides maxDFPercent").withShortName("xs").create();
+
Option minLLROpt = obuilder.withLongName("minLLR").withRequired(false).withArgument(
- abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription(
- "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR)
- .withShortName("ml").create();
-
+ abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR)
+ .withShortName("ml").create();
+
Option numReduceTasksOpt = obuilder.withLongName("numReducers").withArgument(
- abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription(
- "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create();
-
+ abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create();
+
Option powerOpt = obuilder.withLongName("norm").withRequired(false).withArgument(
- abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription(
- "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm. "
- + "Must be greater or equal to 0. The default is not to normalize").withShortName("n").create();
-
+ abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm. "
+ + "Must be greater or equal to 0. The default is not to normalize").withShortName("n").create();
+
Option logNormalizeOpt = obuilder.withLongName("logNormalize").withRequired(false)
- .withDescription(
- "(Optional) Whether output vectors should be logNormalize. If set true else false")
- .withShortName("lnorm").create();
-
+ .withDescription(
+ "(Optional) Whether output vectors should be logNormalize. If set true else false")
+ .withShortName("lnorm").create();
+
Option maxNGramSizeOpt = obuilder.withLongName("maxNGramSize").withRequired(false).withArgument(
- abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create())
- .withDescription(
- "(Optional) The maximum size of ngrams to create"
- + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create();
-
+ abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "(Optional) The maximum size of ngrams to create"
+ + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create();
+
Option sequentialAccessVectorOpt = obuilder.withLongName("sequentialAccessVector").withRequired(false)
- .withDescription(
- "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false")
- .withShortName("seq").create();
-
+ .withDescription(
+ "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false")
+ .withShortName("seq").create();
+
Option namedVectorOpt = obuilder.withLongName("namedVector").withRequired(false)
- .withDescription(
- "(Optional) Whether output vectors should be NamedVectors. If set true else false")
- .withShortName("nv").create();
-
+ .withDescription(
+ "(Optional) Whether output vectors should be NamedVectors. If set true else false")
+ .withShortName("nv").create();
+
Option overwriteOutput = obuilder.withLongName("overwrite").withRequired(false).withDescription(
- "If set, overwrite the output directory").withShortName("ow").create();
+ "If set, overwrite the output directory").withShortName("ow").create();
Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
- .create();
-
+ .create();
+
Group group = gbuilder.withName("Options").withOption(minSupportOpt).withOption(analyzerNameOpt)
- .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt)
- .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt)
- .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput)
- .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt)
- .withOption(logNormalizeOpt)
- .create();
+ .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt)
+ .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt)
+ .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput)
+ .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt)
+ .withOption(logNormalizeOpt)
+ .create();
try {
Parser parser = new Parser();
parser.setGroup(group);
parser.setHelpOption(helpOpt);
CommandLine cmdLine = parser.parse(args);
-
+
if (cmdLine.hasOption(helpOpt)) {
CommandLineUtil.printHelp(group);
return -1;
}
-
+
Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt));
-
+
int chunkSize = 100;
if (cmdLine.hasOption(chunkSizeOpt)) {
chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
@@ -169,9 +169,9 @@ public final class SparseVectorsFromSequ
String minSupportString = (String) cmdLine.getValue(minSupportOpt);
minSupport = Integer.parseInt(minSupportString);
}
-
+
int maxNGramSize = 1;
-
+
if (cmdLine.hasOption(maxNGramSizeOpt)) {
try {
maxNGramSize = Integer.parseInt(cmdLine.getValue(maxNGramSizeOpt).toString());
@@ -180,17 +180,17 @@ public final class SparseVectorsFromSequ
}
}
log.info("Maximum n-gram size is: {}", maxNGramSize);
-
+
if (cmdLine.hasOption(overwriteOutput)) {
HadoopUtil.delete(getConf(), outputDir);
}
-
+
float minLLRValue = LLRReducer.DEFAULT_MIN_LLR;
if (cmdLine.hasOption(minLLROpt)) {
minLLRValue = Float.parseFloat(cmdLine.getValue(minLLROpt).toString());
}
log.info("Minimum LLR value: {}", minLLRValue);
-
+
int reduceTasks = 1;
if (cmdLine.hasOption(numReduceTasksOpt)) {
reduceTasks = Integer.parseInt(cmdLine.getValue(numReduceTasksOpt).toString());
@@ -205,9 +205,9 @@ public final class SparseVectorsFromSequ
// you can't instantiate it
AnalyzerUtils.createAnalyzer(analyzerClass);
}
-
+
boolean processIdf;
-
+
if (cmdLine.hasOption(weightOpt)) {
String wString = cmdLine.getValue(weightOpt).toString();
if ("tf".equalsIgnoreCase(wString)) {
@@ -220,7 +220,7 @@ public final class SparseVectorsFromSequ
} else {
processIdf = true;
}
-
+
int minDf = 1;
if (cmdLine.hasOption(minDFOpt)) {
minDf = Integer.parseInt(cmdLine.getValue(minDFOpt).toString());
@@ -233,7 +233,7 @@ public final class SparseVectorsFromSequ
if (cmdLine.hasOption(maxDFSigmaOpt)) {
maxDFSigma = Double.parseDouble(cmdLine.getValue(maxDFSigmaOpt).toString());
}
-
+
float norm = PartialVectorMerger.NO_NORMALIZING;
if (cmdLine.hasOption(powerOpt)) {
String power = cmdLine.getValue(powerOpt).toString();
@@ -243,12 +243,12 @@ public final class SparseVectorsFromSequ
norm = Float.parseFloat(power);
}
}
-
+
boolean logNormalize = false;
if (cmdLine.hasOption(logNormalizeOpt)) {
logNormalize = true;
}
-
+ log.info("Tokenizing documents in {}", inputDir);
Configuration conf = getConf();
Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
//TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom
@@ -264,98 +264,99 @@ public final class SparseVectorsFromSequ
if (cmdLine.hasOption(namedVectorOpt)) {
namedVectors = true;
}
- boolean shouldPrune = maxDFSigma >= 0.0 || maxDFPercent > 0.00;
+ boolean shouldPrune = maxDFSigma >= 0.0 || maxDFPercent > 0.00;
String tfDirName = shouldPrune
- ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune"
- : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER;
-
+ ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune"
+ : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER;
+ log.info("Creating Term Frequency Vectors");
if (processIdf) {
DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
- outputDir,
- tfDirName,
- conf,
- minSupport,
- maxNGramSize,
- minLLRValue,
- -1.0f,
- false,
- reduceTasks,
- chunkSize,
- sequentialAccessOutput,
- namedVectors);
+ outputDir,
+ tfDirName,
+ conf,
+ minSupport,
+ maxNGramSize,
+ minLLRValue,
+ -1.0f,
+ false,
+ reduceTasks,
+ chunkSize,
+ sequentialAccessOutput,
+ namedVectors);
} else {
DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
- outputDir,
- tfDirName,
- conf,
- minSupport,
- maxNGramSize,
- minLLRValue,
- norm,
- logNormalize,
- reduceTasks,
- chunkSize,
- sequentialAccessOutput,
- namedVectors);
+ outputDir,
+ tfDirName,
+ conf,
+ minSupport,
+ maxNGramSize,
+ minLLRValue,
+ norm,
+ logNormalize,
+ reduceTasks,
+ chunkSize,
+ sequentialAccessOutput,
+ namedVectors);
}
Pair<Long[], List<Path>> docFrequenciesFeatures = null;
// Should document frequency features be processed
if (shouldPrune || processIdf) {
+ log.info("Calculating IDF");
docFrequenciesFeatures =
- TFIDFConverter.calculateDF(new Path(outputDir, tfDirName),outputDir, conf, chunkSize);
+ TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize);
}
long maxDF = maxDFPercent; //if we are pruning by std dev, then this will get changed
if (shouldPrune) {
- long vectorCount = docFrequenciesFeatures.getFirst()[1];
- if (maxDFSigma >= 0.0) {
- Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER);
- Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR);
-
- // Calculate the standard deviation
- double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
- maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);
- }
+ long vectorCount = docFrequenciesFeatures.getFirst()[1];
+ if (maxDFSigma >= 0.0) {
+ Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER);
+ Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR);
+
+ // Calculate the standard deviation
+ double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
+ maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);
+ }
- long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f));
+ long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f));
// Prune the term frequency vectors
Path tfDir = new Path(outputDir, tfDirName);
Path prunedTFDir = new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER);
Path prunedPartialTFDir =
- new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial");
-
+ new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial");
+ log.info("Pruning");
if (processIdf) {
HighDFWordsPruner.pruneVectors(tfDir,
- prunedTFDir,
- prunedPartialTFDir,
- maxDFThreshold,
- minDf,
- conf,
- docFrequenciesFeatures,
- -1.0f,
- false,
- reduceTasks);
+ prunedTFDir,
+ prunedPartialTFDir,
+ maxDFThreshold,
+ minDf,
+ conf,
+ docFrequenciesFeatures,
+ -1.0f,
+ false,
+ reduceTasks);
} else {
HighDFWordsPruner.pruneVectors(tfDir,
- prunedTFDir,
- prunedPartialTFDir,
- maxDFThreshold,
- minDf,
- conf,
- docFrequenciesFeatures,
- norm,
- logNormalize,
- reduceTasks);
+ prunedTFDir,
+ prunedPartialTFDir,
+ maxDFThreshold,
+ minDf,
+ conf,
+ docFrequenciesFeatures,
+ norm,
+ logNormalize,
+ reduceTasks);
}
HadoopUtil.delete(new Configuration(conf), tfDir);
}
if (processIdf) {
TFIDFConverter.processTfIdf(
- new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
- outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
- sequentialAccessOutput, namedVectors, reduceTasks);
+ new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
+ outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
+ sequentialAccessOutput, namedVectors, reduceTasks);
}
} catch (OptionException e) {
log.error("Exception", e);
@@ -363,5 +364,5 @@ public final class SparseVectorsFromSequ
}
return 0;
}
-
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java Sun Jun 9 12:17:45 2013
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditi
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -40,6 +42,8 @@ import org.apache.mahout.math.VectorWrit
import org.apache.mahout.math.map.OpenObjectIntHashMap;
import org.apache.mahout.vectorizer.DictionaryVectorizer;
import org.apache.mahout.vectorizer.common.PartialVectorMerger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
@@ -49,7 +53,7 @@ import java.util.Iterator;
* Converts a document in to a sparse vector
*/
public class TFPartialVectorReducer extends Reducer<Text, StringTuple, Text, VectorWritable> {
-
+ private transient static Logger log = LoggerFactory.getLogger(TFPartialVectorReducer.class);
private final OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<String>();
private int dimension;
@@ -62,7 +66,7 @@ public class TFPartialVectorReducer exte
@Override
protected void reduce(Text key, Iterable<StringTuple> values, Context context)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Iterator<StringTuple> it = values.iterator();
if (!it.hasNext()) {
return;
@@ -119,7 +123,18 @@ public class TFPartialVectorReducer exte
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
"missing paths from the DistributedCache");
-
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.exists(localFiles[0])) {
+ log.info("Can't find dictionary dist. cache file, looking in .getCacheFiles");
+ URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+ if (filesURIs == null) {
+ throw new IOException("Cannot read Frequency list from Distributed Cache");
+ }
+ if (filesURIs.length != 1) {
+ throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
+ }
+ localFiles[0] = new Path(filesURIs[0].getPath());
+ }
dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun 9 12:17:45 2013
@@ -24,6 +24,8 @@ import java.util.Iterator;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -103,7 +105,17 @@ public class TFIDFPartialVectorReducer e
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
"missing paths from the DistributedCache");
-
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.exists(localFiles[0])) {
+ URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+ if (filesURIs == null) {
+ throw new IOException("Cannot read Frequency list from Distributed Cache");
+ }
+ if (filesURIs.length != 1) {
+ throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
+ }
+ localFiles[0] = new Path(filesURIs[0].getPath());
+ }
vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);
Modified: mahout/trunk/examples/bin/cluster-reuters.sh
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/cluster-reuters.sh?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/examples/bin/cluster-reuters.sh (original)
+++ mahout/trunk/examples/bin/cluster-reuters.sh Sun Jun 9 12:17:45 2013
@@ -90,15 +90,19 @@ if [ ! -e ${WORK_DIR}/reuters-out-seqdir
tar xzf ${WORK_DIR}/reuters21578.tar.gz -C ${WORK_DIR}/reuters-sgm
fi
+ echo "Extracting Reuters"
+ $MAHOUT org.apache.lucene.benchmark.utils.ExtractReuters ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-out
if [ "$HADOOP_HOME" != "" ] && [ "$MAHOUT_LOCAL" == "" ] ; then
+ echo "Copying Reuters data to Hadoop"
set +e
$HADOOP dfs -rmr ${WORK_DIR}/reuters-sgm
+ $HADOOP dfs -rmr ${WORK_DIR}/reuters-out
set -e
- $HADOOP dfs -put ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-sgm
- fi
- $MAHOUT org.apache.lucene.benchmark.utils.ExtractReuters ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-out
+ $HADOOP dfs -put ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-sgm
+ $HADOOP dfs -put ${WORK_DIR}/reuters-out ${WORK_DIR}/reuters-out
+ fi
fi
-
+ echo "Converting to Sequence Files from Directory"
$MAHOUT seqdirectory -i ${WORK_DIR}/reuters-out -o ${WORK_DIR}/reuters-out-seqdir -c UTF-8 -chunk 5
fi