You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/11/18 18:54:35 UTC

svn commit: r1543095 - in /mahout/trunk: CHANGELOG core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java

Author: smarthi
Date: Mon Nov 18 17:54:35 2013
New Revision: 1543095

URL: http://svn.apache.org/r1543095
Log:
MAHOUT-1358: StreamingKMeansReducer throws IllegalArgumentException when REDUCE_STREAMING_KMEANS is set to true

Modified:
    mahout/trunk/CHANGELOG
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java

Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1543095&r1=1543094&r2=1543095&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Mon Nov 18 17:54:35 2013
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.9 - unreleased
 
+  MAHOUT-1358: StreamingKMeansThread throws IllegalArgumentException when REDUCE_STREAMING_KMEANS is set to true (smarthi)
+
   MAHOUT-1355: InteractionValueEncoder produces wrong traceDictionary entries (Johannes Schulte via smarthi)
 
   MAHOUT-1351: Adding DenseVector support to AbstractCluster (David DeBarr via smarthi)

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java?rev=1543095&r1=1543094&r2=1543095&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java Mon Nov 18 17:54:35 2013
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,20 +31,24 @@ import org.apache.mahout.common.iterator
 import org.apache.mahout.math.Centroid;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StreamingKMeansThread implements Callable<Iterable<Centroid>> {
+  private static final Logger log = LoggerFactory.getLogger(StreamingKMeansThread.class);
+
   private static final int NUM_ESTIMATE_POINTS = 1000;
 
   private final Configuration conf;
-  private final Iterable<Centroid> datapoints;
+  private final Iterable<Centroid> dataPoints;
 
   public StreamingKMeansThread(Path input, Configuration conf) {
     this(StreamingKMeansUtilsMR.getCentroidsFromVectorWritable(
         new SequenceFileValueIterable<VectorWritable>(input, false, conf)), conf);
   }
 
-  public StreamingKMeansThread(Iterable<Centroid> datapoints, Configuration conf) {
-    this.datapoints = datapoints;
+  public StreamingKMeansThread(Iterable<Centroid> dataPoints, Configuration conf) {
+    this.dataPoints = dataPoints;
     this.conf = conf;
   }
 
@@ -54,22 +59,31 @@ public class StreamingKMeansThread imple
     double estimateDistanceCutoff = conf.getFloat(StreamingKMeansDriver.ESTIMATED_DISTANCE_CUTOFF,
         StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF);
 
-    Iterator<Centroid> datapointsIterator = datapoints.iterator();
+    Iterator<Centroid> dataPointsIterator = dataPoints.iterator();
+    List<Centroid> dataPointsList = Lists.newArrayList();
     if (estimateDistanceCutoff == StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) {
       List<Centroid> estimatePoints = Lists.newArrayListWithExpectedSize(NUM_ESTIMATE_POINTS);
-      while (datapointsIterator.hasNext() && estimatePoints.size() < NUM_ESTIMATE_POINTS) {
-        estimatePoints.add(datapointsIterator.next());
+      while (dataPointsIterator.hasNext() && estimatePoints.size() < NUM_ESTIMATE_POINTS) {
+        Centroid centroid = dataPointsIterator.next();
+        estimatePoints.add(centroid);
+        dataPointsList.add(centroid);
+      }
+
+      if (log.isInfoEnabled()) {
+        log.info("Estimated Points: {}", estimatePoints.size());
       }
       estimateDistanceCutoff = ClusteringUtils.estimateDistanceCutoff(estimatePoints, searcher.getDistanceMeasure());
-    }
 
-    StreamingKMeans clusterer = new StreamingKMeans(searcher, numClusters, estimateDistanceCutoff);
-    while (datapointsIterator.hasNext()) {
-      clusterer.cluster(datapointsIterator.next());
+    } else {
+      Iterators.addAll(dataPointsList, dataPointsIterator);
     }
-    clusterer.reindexCentroids();
 
-    return clusterer;
+    StreamingKMeans streamingKMeans = new StreamingKMeans(searcher, numClusters, estimateDistanceCutoff);
+    for (Centroid aDataPoints : dataPointsList) {
+      streamingKMeans.cluster(aDataPoints);
+    }
+    streamingKMeans.reindexCentroids();
+    return streamingKMeans;
   }
 
 }