You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/11/18 18:54:35 UTC
svn commit: r1543095 - in /mahout/trunk: CHANGELOG
core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
Author: smarthi
Date: Mon Nov 18 17:54:35 2013
New Revision: 1543095
URL: http://svn.apache.org/r1543095
Log:
MAHOUT-1358: StreamingKMeansReducer throws IllegalArgumentException when REDUCE_STREAMING_KMEANS is set to true
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1543095&r1=1543094&r2=1543095&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Mon Nov 18 17:54:35 2013
@@ -2,6 +2,8 @@ Mahout Change Log
Release 0.9 - unreleased
+ MAHOUT-1358: StreamingKMeansThread throws IllegalArgumentException when REDUCE_STREAMING_KMEANS is set to true (smarthi)
+
MAHOUT-1355: InteractionValueEncoder produces wrong traceDictionary entries (Johannes Schulte via smarthi)
MAHOUT-1351: Adding DenseVector support to AbstractCluster (David DeBarr via smarthi)
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java?rev=1543095&r1=1543094&r2=1543095&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java Mon Nov 18 17:54:35 2013
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -30,20 +31,24 @@ import org.apache.mahout.common.iterator
import org.apache.mahout.math.Centroid;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StreamingKMeansThread implements Callable<Iterable<Centroid>> {
+ private static final Logger log = LoggerFactory.getLogger(StreamingKMeansThread.class);
+
private static final int NUM_ESTIMATE_POINTS = 1000;
private final Configuration conf;
- private final Iterable<Centroid> datapoints;
+ private final Iterable<Centroid> dataPoints;
public StreamingKMeansThread(Path input, Configuration conf) {
this(StreamingKMeansUtilsMR.getCentroidsFromVectorWritable(
new SequenceFileValueIterable<VectorWritable>(input, false, conf)), conf);
}
- public StreamingKMeansThread(Iterable<Centroid> datapoints, Configuration conf) {
- this.datapoints = datapoints;
+ public StreamingKMeansThread(Iterable<Centroid> dataPoints, Configuration conf) {
+ this.dataPoints = dataPoints;
this.conf = conf;
}
@@ -54,22 +59,31 @@ public class StreamingKMeansThread imple
double estimateDistanceCutoff = conf.getFloat(StreamingKMeansDriver.ESTIMATED_DISTANCE_CUTOFF,
StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF);
- Iterator<Centroid> datapointsIterator = datapoints.iterator();
+ Iterator<Centroid> dataPointsIterator = dataPoints.iterator();
+ List<Centroid> dataPointsList = Lists.newArrayList();
if (estimateDistanceCutoff == StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) {
List<Centroid> estimatePoints = Lists.newArrayListWithExpectedSize(NUM_ESTIMATE_POINTS);
- while (datapointsIterator.hasNext() && estimatePoints.size() < NUM_ESTIMATE_POINTS) {
- estimatePoints.add(datapointsIterator.next());
+ while (dataPointsIterator.hasNext() && estimatePoints.size() < NUM_ESTIMATE_POINTS) {
+ Centroid centroid = dataPointsIterator.next();
+ estimatePoints.add(centroid);
+ dataPointsList.add(centroid);
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Estimated Points: {}", estimatePoints.size());
}
estimateDistanceCutoff = ClusteringUtils.estimateDistanceCutoff(estimatePoints, searcher.getDistanceMeasure());
- }
- StreamingKMeans clusterer = new StreamingKMeans(searcher, numClusters, estimateDistanceCutoff);
- while (datapointsIterator.hasNext()) {
- clusterer.cluster(datapointsIterator.next());
+ } else {
+ Iterators.addAll(dataPointsList, dataPointsIterator);
}
- clusterer.reindexCentroids();
- return clusterer;
+ StreamingKMeans streamingKMeans = new StreamingKMeans(searcher, numClusters, estimateDistanceCutoff);
+ for (Centroid aDataPoints : dataPointsList) {
+ streamingKMeans.cluster(aDataPoints);
+ }
+ streamingKMeans.reindexCentroids();
+ return streamingKMeans;
}
}