You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2013/06/09 14:17:45 UTC

svn commit: r1491191 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ core/src/main/java/org/apache/mahout/vectorizer/ core/src/main/java/org/apache/mahout/vectorizer/term/ core/src/main/java/org/apache/mahout/...

Author: gsingers
Date: Sun Jun  9 12:17:45 2013
New Revision: 1491191

URL: http://svn.apache.org/r1491191
Log:
MAHOUT-1103: properly partition the data for MapReduce

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
    mahout/trunk/examples/bin/cluster-reuters.sh

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java Sun Jun  9 12:17:45 2013
@@ -17,18 +17,21 @@
 
 package org.apache.mahout.clustering.topdown.postprocessor;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
 /**
  * Reads the number of clusters produced by the clustering algorithm.
  */
@@ -39,11 +42,9 @@ public final class ClusterCountReader {
 
   /**
    * Reads the number of clusters present by reading the clusters-*-final file.
-   * 
-   * @param clusterOutputPath
-   *          The output path provided to the clustering algorithm.
-   * @param conf
-   *          The hadoop configuration.
+   *
+   * @param clusterOutputPath The output path provided to the clustering algorithm.
+   * @param conf              The hadoop configuration.
    * @return the number of final clusters.
    */
   public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException {
@@ -51,11 +52,11 @@ public final class ClusterCountReader {
     FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
     int numberOfClusters = 0;
     Iterator<?> it = new SequenceFileDirValueIterator<Writable>(clusterFiles[0].getPath(),
-                                                                PathType.LIST,
-                                                                PathFilters.partFilter(),
-                                                                null,
-                                                                true,
-                                                                conf);
+            PathType.LIST,
+            PathFilters.partFilter(),
+            null,
+            true,
+            conf);
     while (it.hasNext()) {
       it.next();
       numberOfClusters++;
@@ -63,4 +64,38 @@ public final class ClusterCountReader {
     return numberOfClusters;
   }
 
+  /**
+   * Generates a list of all cluster ids by reading the clusters-*-final file.
+   *
+   * @param clusterOutputPath The output path provided to the clustering algorithm.
+   * @param conf              The hadoop configuration.
+   * @return An ArrayList containing the final cluster ids.
+   */
+  public static Map<Integer, Integer> getClusterIDs(Path clusterOutputPath, Configuration conf, boolean keyIsClusterId) throws IOException {
+    Map<Integer, Integer> clusterIds = new HashMap<Integer, Integer>();
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+    //System.out.println("LOOK HERE: " + clusterOutputPath);
+    Iterator<ClusterWritable> it = new SequenceFileDirValueIterator<ClusterWritable>(clusterFiles[0].getPath(),
+            PathType.LIST,
+            PathFilters.partFilter(),
+            null,
+            true,
+            conf);
+    int i = 0;
+    while (it.hasNext()) {
+      Integer key, value;
+      if (keyIsClusterId == true) { // key is the cluster id, value is i, the index we will use
+        key = it.next().getValue().getId();
+        value = i;
+      } else {
+        key = i;
+        value = it.next().getValue().getId();
+      }
+      clusterIds.put(key, value);
+      i++;
+    }
+    return clusterIds;
+  }
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java Sun Jun  9 12:17:45 2013
@@ -17,10 +17,6 @@
 
 package org.apache.mahout.clustering.topdown.postprocessor;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,77 +33,80 @@ import org.apache.mahout.common.iterator
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
 import org.apache.mahout.math.VectorWritable;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * This class reads the output of any clustering algorithm, and, creates separate directories for different
  * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster
  * directory associated with that point.
- * 
+ * <p/>
  * This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered
  * sequentially.
- * 
+ * <p/>
  * The sequential and non sequential version, both are being used from {@link ClusterOutputPostProcessorDriver}.
  */
 public final class ClusterOutputPostProcessor {
-  
+
   private Path clusteredPoints;
   private final FileSystem fileSystem;
   private final Configuration conf;
   private final Path clusterPostProcessorOutput;
-  private final Map<String,Path> postProcessedClusterDirectories = new HashMap<String,Path>();
+  private final Map<String, Path> postProcessedClusterDirectories = new HashMap<String, Path>();
   private long uniqueVectorId = 0L;
-  private final Map<String,SequenceFile.Writer> writersForClusters;
-  
+  private final Map<String, SequenceFile.Writer> writersForClusters;
+
   public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed,
                                     Path output,
                                     Configuration hadoopConfiguration) throws IOException {
     this.clusterPostProcessorOutput = output;
     this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed);
     this.conf = hadoopConfiguration;
-    this.writersForClusters = new HashMap<String,SequenceFile.Writer>();
-    fileSystem = clusteredPoints.getFileSystem(conf);    
+    this.writersForClusters = new HashMap<String, SequenceFile.Writer>();
+    fileSystem = clusteredPoints.getFileSystem(conf);
   }
-  
+
   /**
    * This method takes the clustered points output by the clustering algorithms as input and writes them into
    * their respective clusters.
    */
   public void process() throws IOException {
     createPostProcessDirectory();
-    for (Pair<?,WeightedVectorWritable> record
-        : new SequenceFileDirIterable<Writable,WeightedVectorWritable>(clusteredPoints,
-                                                                      PathType.GLOB,
-                                                                      PathFilters.partFilter(),
-                                                                      null,
-                                                                      false,
-                                                                      conf)) {
+    for (Pair<?, WeightedVectorWritable> record
+            : new SequenceFileDirIterable<Writable, WeightedVectorWritable>(clusteredPoints,
+            PathType.GLOB,
+            PathFilters.partFilter(),
+            null,
+            false,
+            conf)) {
       String clusterId = record.getFirst().toString().trim();
       putVectorInRespectiveCluster(clusterId, record.getSecond());
     }
     IOUtils.close(writersForClusters.values());
     writersForClusters.clear();
   }
-  
+
   /**
    * Creates the directory to put post processed clusters.
    */
   private void createPostProcessDirectory() throws IOException {
     if (!fileSystem.exists(clusterPostProcessorOutput)
-        && !fileSystem.mkdirs(clusterPostProcessorOutput)) {
+            && !fileSystem.mkdirs(clusterPostProcessorOutput)) {
       throw new IOException("Error creating cluster post processor directory");
     }
   }
-  
+
   /**
-   * 
    * Finds out the cluster directory of the vector and writes it into the specified cluster.
    */
   private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException {
     Writer writer = findWriterForVector(clusterId);
     postProcessedClusterDirectories.put(clusterId,
-        PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
+            PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
     writeVectorToCluster(writer, point);
   }
-  
+
   /**
    * Finds out the path in cluster where the point is supposed to be written.
    */
@@ -121,7 +120,7 @@ public final class ClusterOutputPostProc
     }
     return writer;
   }
-  
+
   /**
    * Writes vector to the cluster directory.
    */
@@ -129,16 +128,16 @@ public final class ClusterOutputPostProc
     writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector()));
     writer.sync();
   }
-  
+
   /**
    * @return the set of all post processed cluster paths.
    */
-  public Map<String,Path> getPostProcessedClusterDirectories() {
+  public Map<String, Path> getPostProcessedClusterDirectories() {
     return postProcessedClusterDirectories;
   }
-  
+
   public void setClusteredPoints(Path clusteredPoints) {
     this.clusteredPoints = clusteredPoints;
   }
-  
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java Sun Jun  9 12:17:45 2013
@@ -17,13 +17,11 @@
 
 package org.apache.mahout.clustering.topdown.postprocessor;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -32,71 +30,74 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
 import org.apache.mahout.math.VectorWritable;
 
+import java.io.IOException;
+
 /**
  * Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be
  * used for top down clustering. It can also be used if the clustering output needs to be grouped into their
  * respective clusters.
  */
 public final class ClusterOutputPostProcessorDriver extends AbstractJob {
-  
+
   /**
    * CLI to run clustering post processor. The input to post processor is the ouput path specified to the
    * clustering.
    */
   @Override
   public int run(String[] args) throws Exception {
-    
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
 
     if (parseArguments(args) == null) {
       return -1;
     }
-    
     Path input = getInputPath();
     Path output = getOutputPath();
 
     if (getConf() == null) {
       setConf(new Configuration());
     }
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
     boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
-      DefaultOptionCreator.SEQUENTIAL_METHOD);
+            DefaultOptionCreator.SEQUENTIAL_METHOD);
     run(input, output, runSequential);
     return 0;
-    
+
   }
-  
+
   /**
    * Constructor to be used by the ToolRunner.
    */
-  private ClusterOutputPostProcessorDriver() {}
-  
+  private ClusterOutputPostProcessorDriver() {
+  }
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args);
   }
-  
+
   /**
    * Post processes the output of clustering algorithms and groups them into respective clusters. Each
    * cluster's vectors are written into a directory named after its clusterId.
-   * 
-   * @param input
-   *          The output path provided to the clustering algorithm, whose would be post processed. Hint : The
-   *          path of the directory containing clusters-*-final and clusteredPoints.
-   * @param output
-   *          The post processed data would be stored at this path.
-   * @param runSequential
-   *          If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering
-   *          was done sequentially, make it sequential, else vice versa.
+   *
+   * @param input         The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *                      path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output        The post processed data would be stored at this path.
+   * @param runSequential If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering
+   *                      was done sequentially, make it sequential, else vice versa.
    */
   public static void run(Path input, Path output, boolean runSequential) throws IOException,
-                                                                        InterruptedException,
-                                                                        ClassNotFoundException {
+          InterruptedException,
+          ClassNotFoundException {
     if (runSequential) {
       postProcessSeq(input, output);
     } else {
@@ -104,81 +105,76 @@ public final class ClusterOutputPostProc
       postProcessMR(conf, input, output);
       movePartFilesToRespectiveDirectories(conf, output);
     }
-    
+
   }
-  
+
   /**
    * Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after
    * their clusterId.
-   * 
-   * @param input
-   *          The output path provided to the clustering algorithm, whose would be post processed. Hint : The
-   *          path of the directory containing clusters-*-final and clusteredPoints.
-   * @param output
-   *          The post processed data would be stored at this path.
+   *
+   * @param input  The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *               path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output The post processed data would be stored at this path.
    */
   private static void postProcessSeq(Path input, Path output) throws IOException {
     ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output,
-        new Configuration());
+            new Configuration());
     clusterOutputPostProcessor.process();
   }
-  
+
   /**
    * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the
    * output. So that each cluster's vector is written in its own part file.
-   * 
-   * @param conf
-   *          The hadoop configuration.
-   * @param input
-   *          The output path provided to the clustering algorithm, whose would be post processed. Hint : The
-   *          path of the directory containing clusters-*-final and clusteredPoints.
-   * @param output
-   *          The post processed data would be stored at this path.
+   *
+   * @param conf   The hadoop configuration.
+   * @param input  The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *               path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output The post processed data would be stored at this path.
    */
   private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException,
-                                                                                InterruptedException,
-                                                                                ClassNotFoundException {
+          InterruptedException,
+          ClassNotFoundException {
+    System.out.println("WARNING: If you are running in Hadoop local mode, please use the --sequential option, as the MapReduce option will not work properly");
+    int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
+    conf.set("clusterOutputPath", input.toString());
     Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(ClusterOutputPostProcessorMapper.class);
-    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputKeyClass(IntWritable.class);
     job.setMapOutputValueClass(VectorWritable.class);
     job.setReducerClass(ClusterOutputPostProcessorReducer.class);
-    job.setOutputKeyClass(Text.class);
+    job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(VectorWritable.class);
-    int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
     job.setNumReduceTasks(numberOfClusters);
     job.setJarByClass(ClusterOutputPostProcessorDriver.class);
-    
+
     FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints")));
     FileOutputFormat.setOutputPath(job, output);
     if (!job.waitForCompletion(true)) {
       throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input);
     }
   }
-  
+
   /**
    * The mapreduce version of the post processor writes different clusters into different part files. This
    * method reads the part files and moves them into directories named after their clusterIds.
-   * 
-   * @param conf
-   *          The hadoop configuration.
-   * @param output
-   *          The post processed data would be stored at this path.
+   *
+   * @param conf   The hadoop configuration.
+   * @param output The post processed data would be stored at this path.
    */
   private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException {
     FileSystem fileSystem = output.getFileSystem(conf);
     for (FileStatus fileStatus : fileSystem.listStatus(output, PathFilters.partFilter())) {
-      SequenceFileIterator<Writable,Writable> it =
-          new SequenceFileIterator<Writable,Writable>(fileStatus.getPath(), true, conf);
+      SequenceFileIterator<Writable, Writable> it =
+              new SequenceFileIterator<Writable, Writable>(fileStatus.getPath(), true, conf);
       if (it.hasNext()) {
         renameFile(it.next().getFirst(), fileStatus, conf);
       }
       it.close();
     }
   }
-  
+
   /**
    * Using @FileSystem rename method to move the file.
    */

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java Sun Jun  9 12:17:45 2013
@@ -17,26 +17,41 @@
 
 package org.apache.mahout.clustering.topdown.postprocessor;
 
-import java.io.IOException;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.classify.WeightedVectorWritable;
 import org.apache.mahout.math.VectorWritable;
 
+import java.io.IOException;
+import java.util.Map;
+
 /**
  * Mapper for post processing cluster output.
  */
 public class ClusterOutputPostProcessorMapper extends
-    Mapper<IntWritable,WeightedVectorWritable,Text,VectorWritable> {
-  
-  /**
-   * The key is the cluster id and the value is the vector.
-   */
+        Mapper<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+
+  private Map<Integer, Integer> newClusterMappings;
+  private VectorWritable outputVector;
+
+  //read the current cluster ids, and populate the cluster mapping hash table
+  @Override
+  public void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    //this give the clusters-x-final directory where the cluster ids can be read
+    Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+    //we want the key to be the cluster id, the value to be the index
+    newClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, true);
+    outputVector = new VectorWritable();
+  }
+
   @Override
-  protected void map(IntWritable key, WeightedVectorWritable vector, Context context) throws IOException,
-                                                                                     InterruptedException {
-    context.write(new Text(key.toString().trim()), new VectorWritable(vector.getVector()));
+  public void map(IntWritable key, WeightedVectorWritable val, Context context) throws IOException, InterruptedException {
+    //by pivoting on the cluster mapping value, we can make sure that each unique cluster goes to it's own reducer, since they
+    //are numbered from 0 to k-1, where k is the number of clusters
+    outputVector.set(val.getVector());
+    context.write(new IntWritable(newClusterMappings.get(key.get())), outputVector);
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java Sun Jun  9 12:17:45 2013
@@ -17,25 +17,46 @@
 
 package org.apache.mahout.clustering.topdown.postprocessor;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.VectorWritable;
 
+import java.io.IOException;
+import java.util.Map;
+
 /**
  * Reducer for post processing cluster output.
  */
-public class ClusterOutputPostProcessorReducer extends Reducer<Text,VectorWritable,Text,VectorWritable> {
+public class ClusterOutputPostProcessorReducer extends Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+
+  private Map<Integer, Integer> reverseClusterMappings;
+
+  //read the current cluster ids, and populate the hash cluster mapping hash table
+  @Override
+  public void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+    //we want to the key to be the index, the value to be the cluster id
+    reverseClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, false);
+  }
+
   /**
-   * The key is the cluster id and the values contains the points in that cluster.
+   * The key is the remapped cluster id and the values contains the vectors in that cluster.
    */
   @Override
-  protected void reduce(Text key, Iterable<VectorWritable> values, Context context) throws IOException,
-                                                                                   InterruptedException {
+  protected void reduce(IntWritable key, Iterable<VectorWritable> values, Context context) throws IOException,
+          InterruptedException {
+    //remap the cluster back to its original id
+    //and then output the vectors with their correct
+    //cluster id.
+    IntWritable outKey = new IntWritable(reverseClusterMappings.get(key.get()));
+    System.out.println(outKey + " this: " + this);
     for (VectorWritable value : values) {
-      context.write(key, value);
+      context.write(outKey, value);
     }
   }
-  
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java Sun Jun  9 12:17:45 2013
@@ -52,6 +52,8 @@ import org.apache.mahout.vectorizer.term
 import org.apache.mahout.vectorizer.term.TermCountCombiner;
 import org.apache.mahout.vectorizer.term.TermCountMapper;
 import org.apache.mahout.vectorizer.term.TermCountReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class converts a set of input documents in the sequence file format to vectors. The Sequence file
@@ -60,6 +62,7 @@ import org.apache.mahout.vectorizer.term
  * This is a dictionary based Vectorizer.
  */
 public final class DictionaryVectorizer implements Vectorizer {
+  private static Logger log = LoggerFactory.getLogger(DictionaryVectorizer.class);
   
   public static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tf-vectors";
   public static final String MIN_SUPPORT = "min.support";
@@ -167,6 +170,7 @@ public final class DictionaryVectorizer 
     
     int[] maxTermDimension = new int[1];
     List<Path> dictionaryChunks;
+    log.info("Creating dictionary from {} and saving at {}", input, dictionaryJobPath);
     if (maxNGramSize == 1) {
       startWordCounting(input, dictionaryJobPath, baseConf, minSupport);
       dictionaryChunks =

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java Sun Jun  9 12:17:45 2013
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.vectorizer;
 
-import java.util.List;
-
 import org.apache.commons.cli2.CommandLine;
 import org.apache.commons.cli2.Group;
 import org.apache.commons.cli2.Option;
@@ -45,121 +43,123 @@ import org.apache.mahout.vectorizer.tfid
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * Converts a given set of sequence files into SparseVectors
  */
 public final class SparseVectorsFromSequenceFiles extends AbstractJob {
-  
+
   private static final Logger log = LoggerFactory.getLogger(SparseVectorsFromSequenceFiles.class);
-  
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new SparseVectorsFromSequenceFiles(), args);
   }
-  
+
   @Override
   public int run(String[] args) throws Exception {
     DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
     ArgumentBuilder abuilder = new ArgumentBuilder();
     GroupBuilder gbuilder = new GroupBuilder();
-    
+
     Option inputDirOpt = DefaultOptionCreator.inputOption().create();
-    
+
     Option outputDirOpt = DefaultOptionCreator.outputOption().create();
-    
+
     Option minSupportOpt = obuilder.withLongName("minSupport").withArgument(
-      abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription(
-      "(Optional) Minimum Support. Default Value: 2").withShortName("s").create();
-    
+            abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription(
+            "(Optional) Minimum Support. Default Value: 2").withShortName("s").create();
+
     Option analyzerNameOpt = obuilder.withLongName("analyzerName").withArgument(
-      abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The class name of the analyzer").withShortName("a").create();
-    
+            abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The class name of the analyzer").withShortName("a").create();
+
     Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
-      abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create();
-    
+            abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create();
+
     Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument(
-      abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create();
-    
+            abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create();
+
     Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument(
-      abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The minimum document frequency.  Default is 1").withShortName("md").create();
+            abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The minimum document frequency.  Default is 1").withShortName("md").create();
 
     Option maxDFPercentOpt = obuilder.withLongName("maxDFPercent").withRequired(false).withArgument(
-        abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription(
-        "The max percentage of docs for the DF.  Can be used to remove really high frequency terms."
-            + " Expressed as an integer between 0 and 100. Default is 99.  If maxDFSigma is also set, "
-            + "it will override this value.").withShortName("x").create();
+            abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The max percentage of docs for the DF.  Can be used to remove really high frequency terms."
+                    + " Expressed as an integer between 0 and 100. Default is 99.  If maxDFSigma is also set, "
+                    + "it will override this value.").withShortName("x").create();
 
     Option maxDFSigmaOpt = obuilder.withLongName("maxDFSigma").withRequired(false).withArgument(
-      abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription(
-      "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) "
-          + "of the document frequencies of these vectors. Can be used to remove really high frequency terms."
-          + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less than 0 "
-          + "no vectors will be filtered out. Default is -1.0.  Overrides maxDFPercent").withShortName("xs").create();
-    
+            abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription(
+            "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) "
+                    + "of the document frequencies of these vectors. Can be used to remove really high frequency terms."
+                    + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less than 0 "
+                    + "no vectors will be filtered out. Default is -1.0.  Overrides maxDFPercent").withShortName("xs").create();
+
     Option minLLROpt = obuilder.withLongName("minLLR").withRequired(false).withArgument(
-      abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription(
-      "(Optional)The minimum Log Likelihood Ratio(Float)  Default is " + LLRReducer.DEFAULT_MIN_LLR)
-        .withShortName("ml").create();
-    
+            abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription(
+            "(Optional)The minimum Log Likelihood Ratio(Float)  Default is " + LLRReducer.DEFAULT_MIN_LLR)
+            .withShortName("ml").create();
+
     Option numReduceTasksOpt = obuilder.withLongName("numReducers").withArgument(
-      abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription(
-      "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create();
-    
+            abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription(
+            "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create();
+
     Option powerOpt = obuilder.withLongName("norm").withRequired(false).withArgument(
-      abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm.  "
-          + "Must be greater or equal to 0.  The default is not to normalize").withShortName("n").create();
-    
+            abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm.  "
+                    + "Must be greater or equal to 0.  The default is not to normalize").withShortName("n").create();
+
     Option logNormalizeOpt = obuilder.withLongName("logNormalize").withRequired(false)
-    .withDescription(
-      "(Optional) Whether output vectors should be logNormalize. If set true else false")
-    .withShortName("lnorm").create();
-    
+            .withDescription(
+                    "(Optional) Whether output vectors should be logNormalize. If set true else false")
+            .withShortName("lnorm").create();
+
     Option maxNGramSizeOpt = obuilder.withLongName("maxNGramSize").withRequired(false).withArgument(
-      abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create())
-        .withDescription(
-          "(Optional) The maximum size of ngrams to create"
-              + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create();
-    
+            abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create())
+            .withDescription(
+                    "(Optional) The maximum size of ngrams to create"
+                            + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create();
+
     Option sequentialAccessVectorOpt = obuilder.withLongName("sequentialAccessVector").withRequired(false)
-        .withDescription(
-          "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false")
-        .withShortName("seq").create();
-    
+            .withDescription(
+                    "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false")
+            .withShortName("seq").create();
+
     Option namedVectorOpt = obuilder.withLongName("namedVector").withRequired(false)
-    .withDescription(
-      "(Optional) Whether output vectors should be NamedVectors. If set true else false")
-    .withShortName("nv").create();
-    
+            .withDescription(
+                    "(Optional) Whether output vectors should be NamedVectors. If set true else false")
+            .withShortName("nv").create();
+
     Option overwriteOutput = obuilder.withLongName("overwrite").withRequired(false).withDescription(
-      "If set, overwrite the output directory").withShortName("ow").create();
+            "If set, overwrite the output directory").withShortName("ow").create();
     Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
-        .create();
-    
+            .create();
+
     Group group = gbuilder.withName("Options").withOption(minSupportOpt).withOption(analyzerNameOpt)
-        .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt)
-        .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt)
-        .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput)
-        .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt)
-        .withOption(logNormalizeOpt)
-        .create();
+            .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt)
+            .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt)
+            .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput)
+            .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt)
+            .withOption(logNormalizeOpt)
+            .create();
     try {
       Parser parser = new Parser();
       parser.setGroup(group);
       parser.setHelpOption(helpOpt);
       CommandLine cmdLine = parser.parse(args);
-      
+
       if (cmdLine.hasOption(helpOpt)) {
         CommandLineUtil.printHelp(group);
         return -1;
       }
-      
+
       Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
       Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt));
-      
+
       int chunkSize = 100;
       if (cmdLine.hasOption(chunkSizeOpt)) {
         chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
@@ -169,9 +169,9 @@ public final class SparseVectorsFromSequ
         String minSupportString = (String) cmdLine.getValue(minSupportOpt);
         minSupport = Integer.parseInt(minSupportString);
       }
-      
+
       int maxNGramSize = 1;
-      
+
       if (cmdLine.hasOption(maxNGramSizeOpt)) {
         try {
           maxNGramSize = Integer.parseInt(cmdLine.getValue(maxNGramSizeOpt).toString());
@@ -180,17 +180,17 @@ public final class SparseVectorsFromSequ
         }
       }
       log.info("Maximum n-gram size is: {}", maxNGramSize);
-      
+
       if (cmdLine.hasOption(overwriteOutput)) {
         HadoopUtil.delete(getConf(), outputDir);
       }
-      
+
       float minLLRValue = LLRReducer.DEFAULT_MIN_LLR;
       if (cmdLine.hasOption(minLLROpt)) {
         minLLRValue = Float.parseFloat(cmdLine.getValue(minLLROpt).toString());
       }
       log.info("Minimum LLR value: {}", minLLRValue);
-      
+
       int reduceTasks = 1;
       if (cmdLine.hasOption(numReduceTasksOpt)) {
         reduceTasks = Integer.parseInt(cmdLine.getValue(numReduceTasksOpt).toString());
@@ -205,9 +205,9 @@ public final class SparseVectorsFromSequ
         // you can't instantiate it
         AnalyzerUtils.createAnalyzer(analyzerClass);
       }
-      
+
       boolean processIdf;
-      
+
       if (cmdLine.hasOption(weightOpt)) {
         String wString = cmdLine.getValue(weightOpt).toString();
         if ("tf".equalsIgnoreCase(wString)) {
@@ -220,7 +220,7 @@ public final class SparseVectorsFromSequ
       } else {
         processIdf = true;
       }
-      
+
       int minDf = 1;
       if (cmdLine.hasOption(minDFOpt)) {
         minDf = Integer.parseInt(cmdLine.getValue(minDFOpt).toString());
@@ -233,7 +233,7 @@ public final class SparseVectorsFromSequ
       if (cmdLine.hasOption(maxDFSigmaOpt)) {
         maxDFSigma = Double.parseDouble(cmdLine.getValue(maxDFSigmaOpt).toString());
       }
-      
+
       float norm = PartialVectorMerger.NO_NORMALIZING;
       if (cmdLine.hasOption(powerOpt)) {
         String power = cmdLine.getValue(powerOpt).toString();
@@ -243,12 +243,12 @@ public final class SparseVectorsFromSequ
           norm = Float.parseFloat(power);
         }
       }
-      
+
       boolean logNormalize = false;
       if (cmdLine.hasOption(logNormalizeOpt)) {
         logNormalize = true;
       }
-
+      log.info("Tokenizing documents in {}", inputDir);
       Configuration conf = getConf();
       Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
       //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom
@@ -264,98 +264,99 @@ public final class SparseVectorsFromSequ
       if (cmdLine.hasOption(namedVectorOpt)) {
         namedVectors = true;
       }
-      boolean shouldPrune = maxDFSigma >= 0.0  || maxDFPercent > 0.00;
+      boolean shouldPrune = maxDFSigma >= 0.0 || maxDFPercent > 0.00;
       String tfDirName = shouldPrune
-          ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune"
-          : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER;
-
+              ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune"
+              : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER;
+      log.info("Creating Term Frequency Vectors");
       if (processIdf) {
         DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
-                                                        outputDir,
-                                                        tfDirName,
-                                                        conf,
-                                                        minSupport,
-                                                        maxNGramSize,
-                                                        minLLRValue,
-                                                        -1.0f,
-                                                        false,
-                                                        reduceTasks,
-                                                        chunkSize,
-                                                        sequentialAccessOutput,
-                                                        namedVectors);
+                outputDir,
+                tfDirName,
+                conf,
+                minSupport,
+                maxNGramSize,
+                minLLRValue,
+                -1.0f,
+                false,
+                reduceTasks,
+                chunkSize,
+                sequentialAccessOutput,
+                namedVectors);
       } else {
         DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
-                                                        outputDir,
-                                                        tfDirName,
-                                                        conf,
-                                                        minSupport,
-                                                        maxNGramSize,
-                                                        minLLRValue,
-                                                        norm,
-                                                        logNormalize,
-                                                        reduceTasks,
-                                                        chunkSize,
-                                                        sequentialAccessOutput,
-                                                        namedVectors);
+                outputDir,
+                tfDirName,
+                conf,
+                minSupport,
+                maxNGramSize,
+                minLLRValue,
+                norm,
+                logNormalize,
+                reduceTasks,
+                chunkSize,
+                sequentialAccessOutput,
+                namedVectors);
       }
 
       Pair<Long[], List<Path>> docFrequenciesFeatures = null;
       // Should document frequency features be processed
       if (shouldPrune || processIdf) {
+        log.info("Calculating IDF");
         docFrequenciesFeatures =
-            TFIDFConverter.calculateDF(new Path(outputDir, tfDirName),outputDir, conf, chunkSize);
+                TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize);
       }
 
       long maxDF = maxDFPercent; //if we are pruning by std dev, then this will get changed
       if (shouldPrune) {
-	long vectorCount = docFrequenciesFeatures.getFirst()[1];
-	if (maxDFSigma >= 0.0) {
-	  Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER);
-	  Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR);
-
-	  // Calculate the standard deviation
-	  double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
-	  maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);
-	}
+        long vectorCount = docFrequenciesFeatures.getFirst()[1];
+        if (maxDFSigma >= 0.0) {
+          Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER);
+          Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR);
+
+          // Calculate the standard deviation
+          double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
+          maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);
+        }
 
-	long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f));
+        long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f));
 
         // Prune the term frequency vectors
         Path tfDir = new Path(outputDir, tfDirName);
         Path prunedTFDir = new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER);
         Path prunedPartialTFDir =
-            new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial");
-
+                new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial");
+        log.info("Pruning");
         if (processIdf) {
           HighDFWordsPruner.pruneVectors(tfDir,
-                                         prunedTFDir,
-                                         prunedPartialTFDir,
-                                         maxDFThreshold,
-					 minDf,
-                                         conf,
-                                         docFrequenciesFeatures,
-                                         -1.0f,
-                                         false,
-                                         reduceTasks);
+                  prunedTFDir,
+                  prunedPartialTFDir,
+                  maxDFThreshold,
+                  minDf,
+                  conf,
+                  docFrequenciesFeatures,
+                  -1.0f,
+                  false,
+                  reduceTasks);
         } else {
           HighDFWordsPruner.pruneVectors(tfDir,
-                                         prunedTFDir,
-                                         prunedPartialTFDir,
-                                         maxDFThreshold,
-					 minDf,
-                                         conf,
-                                         docFrequenciesFeatures,
-                                         norm,
-                                         logNormalize,
-                                         reduceTasks);
+                  prunedTFDir,
+                  prunedPartialTFDir,
+                  maxDFThreshold,
+                  minDf,
+                  conf,
+                  docFrequenciesFeatures,
+                  norm,
+                  logNormalize,
+                  reduceTasks);
         }
         HadoopUtil.delete(new Configuration(conf), tfDir);
       }
       if (processIdf) {
         TFIDFConverter.processTfIdf(
-               new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
-               outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
-               sequentialAccessOutput, namedVectors, reduceTasks);
+                new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
+                outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
+                sequentialAccessOutput, namedVectors, reduceTasks);
       }
     } catch (OptionException e) {
       log.error("Exception", e);
@@ -363,5 +364,5 @@ public final class SparseVectorsFromSequ
     }
     return 0;
   }
-  
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java Sun Jun  9 12:17:45 2013
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditi
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -40,6 +42,8 @@ import org.apache.mahout.math.VectorWrit
 import org.apache.mahout.math.map.OpenObjectIntHashMap;
 import org.apache.mahout.vectorizer.DictionaryVectorizer;
 import org.apache.mahout.vectorizer.common.PartialVectorMerger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
@@ -49,7 +53,7 @@ import java.util.Iterator;
  * Converts a document in to a sparse vector
  */
 public class TFPartialVectorReducer extends Reducer<Text, StringTuple, Text, VectorWritable> {
-
+  private transient static Logger log = LoggerFactory.getLogger(TFPartialVectorReducer.class);
   private final OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<String>();
 
   private int dimension;
@@ -62,7 +66,7 @@ public class TFPartialVectorReducer exte
 
   @Override
   protected void reduce(Text key, Iterable<StringTuple> values, Context context)
-    throws IOException, InterruptedException {
+          throws IOException, InterruptedException {
     Iterator<StringTuple> it = values.iterator();
     if (!it.hasNext()) {
       return;
@@ -119,7 +123,18 @@ public class TFPartialVectorReducer exte
     Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
     Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
             "missing paths from the DistributedCache");
-
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    if (!localFs.exists(localFiles[0])) {
+      log.info("Can't find dictionary dist. cache file, looking in .getCacheFiles");
+      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+      if (filesURIs == null) {
+        throw new IOException("Cannot read Frequency list from Distributed Cache");
+      }
+      if (filesURIs.length != 1) {
+        throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
+      }
+      localFiles[0] = new Path(filesURIs[0].getPath());
+    }
     dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
     sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
     namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun  9 12:17:45 2013
@@ -24,6 +24,8 @@ import java.util.Iterator;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -103,7 +105,17 @@ public class TFIDFPartialVectorReducer e
     Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
     Preconditions.checkArgument(localFiles != null && localFiles.length >= 1, 
         "missing paths from the DistributedCache");
-
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    if (!localFs.exists(localFiles[0])) {
+      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+      if (filesURIs == null) {
+        throw new IOException("Cannot read Frequency list from Distributed Cache");
+      }
+      if (filesURIs.length != 1) {
+        throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
+      }
+      localFiles[0] = new Path(filesURIs[0].getPath());
+    }
     vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
     featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
     minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);

Modified: mahout/trunk/examples/bin/cluster-reuters.sh
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/cluster-reuters.sh?rev=1491191&r1=1491190&r2=1491191&view=diff
==============================================================================
--- mahout/trunk/examples/bin/cluster-reuters.sh (original)
+++ mahout/trunk/examples/bin/cluster-reuters.sh Sun Jun  9 12:17:45 2013
@@ -90,15 +90,19 @@ if [ ! -e ${WORK_DIR}/reuters-out-seqdir
       tar xzf ${WORK_DIR}/reuters21578.tar.gz -C ${WORK_DIR}/reuters-sgm
     fi
   
+    echo "Extracting Reuters"
+    $MAHOUT org.apache.lucene.benchmark.utils.ExtractReuters ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-out
     if [ "$HADOOP_HOME" != "" ] && [ "$MAHOUT_LOCAL" == "" ] ; then
+        echo "Copying Reuters data to Hadoop"
         set +e
         $HADOOP dfs -rmr ${WORK_DIR}/reuters-sgm
+        $HADOOP dfs -rmr ${WORK_DIR}/reuters-out
         set -e
-        $HADOOP dfs -put ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-sgm 
-    fi 
-    $MAHOUT org.apache.lucene.benchmark.utils.ExtractReuters ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-out
+        $HADOOP dfs -put ${WORK_DIR}/reuters-sgm ${WORK_DIR}/reuters-sgm
+        $HADOOP dfs -put ${WORK_DIR}/reuters-out ${WORK_DIR}/reuters-out
+    fi
   fi
-
+  echo "Converting to Sequence Files from Directory"
   $MAHOUT seqdirectory -i ${WORK_DIR}/reuters-out -o ${WORK_DIR}/reuters-out-seqdir -c UTF-8 -chunk 5
 fi