You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:30 UTC
[28/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
new file mode 100644
index 0000000..c42ab70
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * <p>This class handles the three-way multiplication of the digonal matrix
+ * and the Markov transition matrix inherent in the Eigencuts algorithm.
+ * The equation takes the form:</p>
+ *
+ * {@code W = D^(1/2) * M * D^(1/2)}
+ *
+ * <p>Since the diagonal matrix D has only n non-zero elements, it is represented
+ * as a dense vector in this job, rather than a full n-by-n matrix. This job
+ * performs the multiplications and returns the new DRM.
+ */
+public final class VectorMatrixMultiplicationJob {
+
+ private VectorMatrixMultiplicationJob() {
+ }
+
+ /**
+ * Invokes the job.
+ * @param markovPath Path to the markov DRM's sequence files
+ */
+ public static DistributedRowMatrix runJob(Path markovPath, Vector diag, Path outputPath)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ return runJob(markovPath, diag, outputPath, new Path(outputPath, "tmp"));
+ }
+
+ public static DistributedRowMatrix runJob(Path markovPath, Vector diag, Path outputPath, Path tmpPath)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ // set up the serialization of the diagonal vector
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(markovPath.toUri(), conf);
+ markovPath = fs.makeQualified(markovPath);
+ outputPath = fs.makeQualified(outputPath);
+ Path vectorOutputPath = new Path(outputPath.getParent(), "vector");
+ VectorCache.save(new IntWritable(Keys.DIAGONAL_CACHE_INDEX), diag, vectorOutputPath, conf);
+
+ // set up the job itself
+ Job job = new Job(conf, "VectorMatrixMultiplication");
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(VectorMatrixMultiplicationMapper.class);
+ job.setNumReduceTasks(0);
+
+ FileInputFormat.addInputPath(job, markovPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ job.setJarByClass(VectorMatrixMultiplicationJob.class);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+
+ // build the resulting DRM from the results
+ return new DistributedRowMatrix(outputPath, tmpPath,
+ diag.size(), diag.size());
+ }
+
+ public static class VectorMatrixMultiplicationMapper
+ extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private Vector diagonal;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ // read in the diagonal vector from the distributed cache
+ super.setup(context);
+ Configuration config = context.getConfiguration();
+ diagonal = VectorCache.load(config);
+ if (diagonal == null) {
+ throw new IOException("No vector loaded from cache!");
+ }
+ if (!(diagonal instanceof DenseVector)) {
+ diagonal = new DenseVector(diagonal);
+ }
+ }
+
+ @Override
+ protected void map(IntWritable key, VectorWritable row, Context ctx)
+ throws IOException, InterruptedException {
+
+ for (Vector.Element e : row.get().all()) {
+ double dii = Functions.SQRT.apply(diagonal.get(key.get()));
+ double djj = Functions.SQRT.apply(diagonal.get(e.index()));
+ double mij = e.get();
+ e.set(dii * mij * djj);
+ }
+ ctx.write(key, row);
+ }
+
+ /**
+ * Performs the setup of the Mapper. Used by unit tests.
+ * @param diag
+ */
+ void setup(Vector diag) {
+ this.diagonal = diag;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java
new file mode 100644
index 0000000..0d70cac
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Represents a vertex within the affinity graph for Eigencuts.
+ */
+public class VertexWritable implements Writable {
+
+ /** the row */
+ private int i;
+
+ /** the column */
+ private int j;
+
+ /** the value at this vertex */
+ private double value;
+
+ /** an extra type delimeter, can probably be null */
+ private String type;
+
+ public VertexWritable() {
+ }
+
+ public VertexWritable(int i, int j, double v, String t) {
+ this.i = i;
+ this.j = j;
+ this.value = v;
+ this.type = t;
+ }
+
+ public int getRow() {
+ return i;
+ }
+
+ public void setRow(int i) {
+ this.i = i;
+ }
+
+ public int getCol() {
+ return j;
+ }
+
+ public void setCol(int j) {
+ this.j = j;
+ }
+
+ public double getValue() {
+ return value;
+ }
+
+ public void setValue(double v) {
+ this.value = v;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String t) {
+ this.type = t;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ this.i = arg0.readInt();
+ this.j = arg0.readInt();
+ this.value = arg0.readDouble();
+ this.type = arg0.readUTF();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ arg0.writeInt(i);
+ arg0.writeInt(j);
+ arg0.writeDouble(value);
+ arg0.writeUTF(type);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
new file mode 100644
index 0000000..3ce94dc
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.kmeans.Kluster;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, select k vectors and write them to the
+ * output file as a {@link org.apache.mahout.clustering.kmeans.Kluster} representing the initial centroid to use. The
+ * selection criterion is the rows with max value in that respective column
+ */
+public final class EigenSeedGenerator {
+
+ private static final Logger log = LoggerFactory.getLogger(EigenSeedGenerator.class);
+
+ public static final String K = "k";
+
+ private EigenSeedGenerator() {}
+
+ public static Path buildFromEigens(Configuration conf, Path input, Path output, int k, DistanceMeasure measure)
+ throws IOException {
+ // delete the output directory
+ FileSystem fs = FileSystem.get(output.toUri(), conf);
+ HadoopUtil.delete(conf, output);
+ Path outFile = new Path(output, "part-eigenSeed");
+ boolean newFile = fs.createNewFile(outFile);
+ if (newFile) {
+ Path inputPathPattern;
+
+ if (fs.getFileStatus(input).isDir()) {
+ inputPathPattern = new Path(input, "*");
+ } else {
+ inputPathPattern = input;
+ }
+
+ FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter());
+ Map<Integer,Double> maxEigens = new HashMap<>(k); // store
+ // max
+ // value
+ // of
+ // each
+ // column
+ Map<Integer,Text> chosenTexts = new HashMap<>(k);
+ Map<Integer,ClusterWritable> chosenClusters = new HashMap<>(k);
+
+ for (FileStatus fileStatus : inputFiles) {
+ if (!fileStatus.isDir()) {
+ for (Pair<Writable,VectorWritable> record : new SequenceFileIterable<Writable,VectorWritable>(
+ fileStatus.getPath(), true, conf)) {
+ Writable key = record.getFirst();
+ VectorWritable value = record.getSecond();
+
+ for (Vector.Element e : value.get().nonZeroes()) {
+ int index = e.index();
+ double v = Math.abs(e.get());
+
+ if (!maxEigens.containsKey(index) || v > maxEigens.get(index)) {
+ maxEigens.put(index, v);
+ Text newText = new Text(key.toString());
+ chosenTexts.put(index, newText);
+ Kluster newCluster = new Kluster(value.get(), index, measure);
+ newCluster.observe(value.get(), 1);
+ ClusterWritable clusterWritable = new ClusterWritable();
+ clusterWritable.setValue(newCluster);
+ chosenClusters.put(index, clusterWritable);
+ }
+ }
+ }
+ }
+ }
+
+ try (SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class)){
+ for (Integer key : maxEigens.keySet()) {
+ writer.append(chosenTexts.get(key), chosenClusters.get(key));
+ }
+ log.info("EigenSeedGenerator:: Wrote {} Klusters to {}", chosenTexts.size(), outFile);
+ }
+ }
+
+ return outFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
new file mode 100644
index 0000000..427de91
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.spectral.AffinityMatrixInputJob;
+import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob;
+import org.apache.mahout.clustering.spectral.UnitVectorizerJob;
+import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs spectral k-means clustering on the top k eigenvectors of the input affinity matrix.
+ */
+public class SpectralKMeansDriver extends AbstractJob {
+ private static final Logger log = LoggerFactory.getLogger(SpectralKMeansDriver.class);
+
+ public static final int REDUCERS = 10;
+ public static final int BLOCKHEIGHT = 30000;
+ public static final int OVERSAMPLING = 15;
+ public static final int POWERITERS = 0;
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SpectralKMeansDriver(), args);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+
+ Configuration conf = getConf();
+ addInputOption();
+ addOutputOption();
+ addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+ addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.convergenceOption().create());
+ addOption(DefaultOptionCreator.maxIterationsOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver. Default is the Lanczos solver.");
+ addOption("reduceTasks", "t", "Number of reducers for SSVD", String.valueOf(REDUCERS));
+ addOption("outerProdBlockHeight", "oh", "Block height of outer products for SSVD", String.valueOf(BLOCKHEIGHT));
+ addOption("oversampling", "p", "Oversampling parameter for SSVD", String.valueOf(OVERSAMPLING));
+ addOption("powerIter", "q", "Additional power iterations for SSVD", String.valueOf(POWERITERS));
+
+ Map<String, List<String>> parsedArgs = parseArguments(arg0);
+ if (parsedArgs == null) {
+ return 0;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(conf, getTempPath());
+ HadoopUtil.delete(conf, getOutputPath());
+ }
+ int numDims = Integer.parseInt(getOption("dimensions"));
+ int clusters = Integer.parseInt(getOption("clusters"));
+ String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+ double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+
+ Path tempdir = new Path(getOption("tempDir"));
+ int reducers = Integer.parseInt(getOption("reduceTasks"));
+ int blockheight = Integer.parseInt(getOption("outerProdBlockHeight"));
+ int oversampling = Integer.parseInt(getOption("oversampling"));
+ int poweriters = Integer.parseInt(getOption("powerIter"));
+ run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, reducers,
+ blockheight, oversampling, poweriters);
+
+ return 0;
+ }
+
+ public static void run(Configuration conf, Path input, Path output, int numDims, int clusters,
+ DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempDir, REDUCERS,
+ BLOCKHEIGHT, OVERSAMPLING, POWERITERS);
+ }
+
+ /**
+ * Run the Spectral KMeans clustering on the supplied arguments
+ *
+ * @param conf
+ * the Configuration to be used
+ * @param input
+ * the Path to the input tuples directory
+ * @param output
+ * the Path to the output directory
+ * @param numDims
+ * the int number of dimensions of the affinity matrix
+ * @param clusters
+ * the int number of eigenvectors and thus clusters to produce
+ * @param measure
+ * the DistanceMeasure for the k-Means calculations
+ * @param convergenceDelta
+ * the double convergence delta for the k-Means calculations
+ * @param maxIterations
+ * the int maximum number of iterations for the k-Means calculations
+ * @param tempDir
+ * Temporary directory for intermediate calculations
+ * @param numReducers
+ * Number of reducers
+ * @param blockHeight
+ * @param oversampling
+ * @param poweriters
+ */
+ public static void run(Configuration conf, Path input, Path output, int numDims, int clusters,
+ DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir,
+ int numReducers, int blockHeight, int oversampling, int poweriters)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ HadoopUtil.delete(conf, tempDir);
+ Path outputCalc = new Path(tempDir, "calculations");
+ Path outputTmp = new Path(tempDir, "temporary");
+
+ // Take in the raw CSV text file and split it ourselves,
+ // creating our own SequenceFiles for the matrices to read later
+ // (similar to the style of syntheticcontrol.canopy.InputMapper)
+ Path affSeqFiles = new Path(outputCalc, "seqfile");
+ AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
+
+ // Construct the affinity matrix using the newly-created sequence files
+ DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims);
+
+ Configuration depConf = new Configuration(conf);
+ A.setConf(depConf);
+
+ // Construct the diagonal matrix D (represented as a vector)
+ Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+
+ // Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
+ DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D, new Path(outputCalc, "laplacian"),
+ new Path(outputCalc, outputCalc));
+ L.setConf(depConf);
+
+ Path data;
+
+ // SSVD requires an array of Paths to function. So we pass in an array of length one
+ Path[] LPath = new Path[1];
+ LPath[0] = L.getRowPath();
+
+ Path SSVDout = new Path(outputCalc, "SSVD");
+
+ SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, clusters, oversampling, numReducers);
+
+ solveIt.setComputeV(false);
+ solveIt.setComputeU(true);
+ solveIt.setOverwrite(true);
+ solveIt.setQ(poweriters);
+ // solveIt.setBroadcast(false);
+ solveIt.run();
+ data = new Path(solveIt.getUPath());
+
+ // Normalize the rows of Wt to unit length
+ // normalize is important because it reduces the occurrence of two unique clusters combining into one
+ Path unitVectors = new Path(outputCalc, "unitvectors");
+
+ UnitVectorizerJob.runJob(data, unitVectors);
+
+ DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+ Wt.setConf(depConf);
+ data = Wt.getRowPath();
+
+ // Generate initial clusters using EigenSeedGenerator which picks rows as centroids if that row contains max
+ // eigen value in that column
+ Path initialclusters = EigenSeedGenerator.buildFromEigens(conf, data,
+ new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure);
+
+ // Run the KMeansDriver
+ Path answer = new Path(output, "kmeans_out");
+ KMeansDriver.run(conf, data, initialclusters, answer, convergenceDelta, maxIterations, true, 0.0, false);
+
+ // Restore name to id mapping and read through the cluster assignments
+ Path mappingPath = new Path(new Path(conf.get("hadoop.tmp.dir")), "generic_input_mapping");
+ List<String> mapping = new ArrayList<>();
+ FileSystem fs = FileSystem.get(mappingPath.toUri(), conf);
+ if (fs.exists(mappingPath)) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, mappingPath, conf);
+ Text mappingValue = new Text();
+ IntWritable mappingIndex = new IntWritable();
+ while (reader.next(mappingIndex, mappingValue)) {
+ String s = mappingValue.toString();
+ mapping.add(s);
+ }
+ HadoopUtil.delete(conf, mappingPath);
+ } else {
+ log.warn("generic input mapping file not found!");
+ }
+
+ Path clusteredPointsPath = new Path(answer, "clusteredPoints");
+ Path inputPath = new Path(clusteredPointsPath, "part-m-00000");
+ int id = 0;
+ for (Pair<IntWritable, WeightedVectorWritable> record :
+ new SequenceFileIterable<IntWritable, WeightedVectorWritable>(inputPath, conf)) {
+ if (!mapping.isEmpty()) {
+ log.info("{}: {}", mapping.get(id++), record.getFirst().get());
+ } else {
+ log.info("{}: {}", id++, record.getFirst().get());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java
new file mode 100644
index 0000000..25806fe
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import org.apache.mahout.clustering.ClusteringUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.WeightedVector;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.apache.mahout.math.random.Multinomial;
+import org.apache.mahout.math.random.WeightedThing;
+
+/**
+ * Implements a ball k-means algorithm for weighted vectors with probabilistic seeding similar to k-means++.
+ * The idea is that k-means++ gives good starting clusters and ball k-means can tune up the final result very nicely
+ * in only a few passes (or even in a single iteration for well-clusterable data).
+ *
+ * A good reference for this class of algorithms is "The Effectiveness of Lloyd-Type Methods for the k-Means Problem"
+ * by Rafail Ostrovsky, Yuval Rabani, Leonard J. Schulman and Chaitanya Swamy. The code here uses the seeding strategy
+ * as described in section 4.1.1 of that paper and the ball k-means step as described in section 4.2. We support
+ * multiple iterations in contrast to the algorithm described in the paper.
+ */
+public class BallKMeans implements Iterable<Centroid> {
+ /**
+ * The searcher containing the centroids.
+ */
+ private final UpdatableSearcher centroids;
+
+ /**
+ * The number of clusters to cluster the data into.
+ */
+ private final int numClusters;
+
+ /**
+ * The maximum number of iterations of the algorithm to run waiting for the cluster assignments
+ * to stabilize. If there are no changes in cluster assignment earlier, we can finish early.
+ */
+ private final int maxNumIterations;
+
+ /**
+ * When deciding which points to include in the new centroid calculation,
+ * it's preferable to exclude outliers since it increases the rate of convergence.
+ * So, we calculate the distance from each cluster to its closest neighboring cluster. When
+ * evaluating the points assigned to a cluster, we compare the distance between the centroid to
+ * the point with the distance between the centroid and its closest centroid neighbor
+ * multiplied by this trimFraction. If the distance between the centroid and the point is
+ * greater, we consider it an outlier and we don't use it.
+ */
+ private final double trimFraction;
+
+ /**
+ * Selecting the initial centroids is the most important part of the ball k-means clustering. Poor choices, like two
+ * centroids in the same actual cluster result in a low-quality final result.
+ * k-means++ initialization yields good quality clusters, especially when using BallKMeans after StreamingKMeans as
+ * the points have weights.
+ * Simple, random selection of the points based on their weights is faster but sometimes fails to produce the
+ * desired number of clusters.
+ * This field is true if the initialization should be done with k-means++.
+ */
+ private final boolean kMeansPlusPlusInit;
+
+ /**
+ * When using trimFraction, the weight of each centroid will not be the sum of the weights of
+ * the vectors assigned to that cluster because outliers are not used to compute the updated
+ * centroid.
+ * So, the total weight is probably wrong. This can be fixed by doing another pass over the
+ * data points and adjusting the weights of each centroid. This doesn't update the coordinates
+ * of the centroids, but is useful if the weights matter.
+ */
+ private final boolean correctWeights;
+
+ /**
+ * When running multiple ball k-means passes to get the one with the smallest total cost, can compute the
+ * overall cost, using all the points for clustering, or reserve a fraction of them, testProbability in a test set.
+ * The cost is the sum of the distances between each point and its corresponding centroid.
+ * We then use this set of points to compute the total cost on. We're therefore trying to select the clustering
+ * that best describes the underlying distribution of the clusters.
+ * This field is the probability of assigning a given point to the test set. If this is 0, the cost will be computed
+ * on the entire set of points.
+ */
+ private final double testProbability;
+
+ /**
+ * Whether or not testProbability > 0, i.e., there exists a non-empty 'test' set.
+ */
+ private final boolean splitTrainTest;
+
+ /**
+ * How many k-means runs to have. If there's more than one run, we compute the cost of each clustering as described
+ * above and select the clustering that minimizes the cost.
+ * Multiple runs are a lot more useful when using the random initialization. With kmeans++, 1-2 runs are enough and
+ * more runs are not likely to help quality much.
+ */
+ private final int numRuns;
+
+ /**
+ * Random object to sample values from.
+ */
+ private final Random random;
+
+ public BallKMeans(UpdatableSearcher searcher, int numClusters, int maxNumIterations) {
+ // By default, the trimFraction is 0.9, k-means++ is used, the weights will be corrected at the end,
+ // there will be 0 points in the test set and 1 run.
+ this(searcher, numClusters, maxNumIterations, 0.9, true, true, 0.0, 1);
+ }
+
+ public BallKMeans(UpdatableSearcher searcher, int numClusters, int maxNumIterations,
+ boolean kMeansPlusPlusInit, int numRuns) {
+ // By default, the trimFraction is 0.9, k-means++ is used, the weights will be corrected at the end,
+ // there will be 10% points of in the test set.
+ this(searcher, numClusters, maxNumIterations, 0.9, kMeansPlusPlusInit, true, 0.1, numRuns);
+ }
+
+ public BallKMeans(UpdatableSearcher searcher, int numClusters, int maxNumIterations,
+ double trimFraction, boolean kMeansPlusPlusInit, boolean correctWeights,
+ double testProbability, int numRuns) {
+ Preconditions.checkArgument(searcher.size() == 0, "Searcher must be empty initially to populate with centroids");
+ Preconditions.checkArgument(numClusters > 0, "The requested number of clusters must be positive");
+ Preconditions.checkArgument(maxNumIterations > 0, "The maximum number of iterations must be positive");
+ Preconditions.checkArgument(trimFraction > 0, "The trim fraction must be positive");
+ Preconditions.checkArgument(testProbability >= 0 && testProbability < 1, "The testProbability must be in [0, 1)");
+ Preconditions.checkArgument(numRuns > 0, "There has to be at least one run");
+
+ this.centroids = searcher;
+ this.numClusters = numClusters;
+ this.maxNumIterations = maxNumIterations;
+
+ this.trimFraction = trimFraction;
+ this.kMeansPlusPlusInit = kMeansPlusPlusInit;
+ this.correctWeights = correctWeights;
+
+ this.testProbability = testProbability;
+ this.splitTrainTest = testProbability > 0;
+ this.numRuns = numRuns;
+
+ this.random = RandomUtils.getRandom();
+ }
+
+ public Pair<List<? extends WeightedVector>, List<? extends WeightedVector>> splitTrainTest(
+ List<? extends WeightedVector> datapoints) {
+ // If there will be no points assigned to the test set, return now.
+ if (testProbability == 0) {
+ return new Pair<List<? extends WeightedVector>, List<? extends WeightedVector>>(datapoints,
+ new ArrayList<WeightedVector>());
+ }
+
+ int numTest = (int) (testProbability * datapoints.size());
+ Preconditions.checkArgument(numTest > 0 && numTest < datapoints.size(),
+ "Must have nonzero number of training and test vectors. Asked for %.1f %% of %d vectors for test",
+ testProbability * 100, datapoints.size());
+
+ Collections.shuffle(datapoints);
+ return new Pair<List<? extends WeightedVector>, List<? extends WeightedVector>>(
+ datapoints.subList(numTest, datapoints.size()), datapoints.subList(0, numTest));
+ }
+
+ /**
+ * Clusters the datapoints in the list doing either random seeding of the centroids or k-means++.
+ *
+ * @param datapoints the points to be clustered.
+ * @return an UpdatableSearcher with the resulting clusters.
+ */
+ public UpdatableSearcher cluster(List<? extends WeightedVector> datapoints) {
+ Pair<List<? extends WeightedVector>, List<? extends WeightedVector>> trainTestSplit = splitTrainTest(datapoints);
+ List<Vector> bestCentroids = new ArrayList<>();
+ double cost = Double.POSITIVE_INFINITY;
+ double bestCost = Double.POSITIVE_INFINITY;
+ for (int i = 0; i < numRuns; ++i) {
+ centroids.clear();
+ if (kMeansPlusPlusInit) {
+ // Use k-means++ to set initial centroids.
+ initializeSeedsKMeansPlusPlus(trainTestSplit.getFirst());
+ } else {
+ // Randomly select the initial centroids.
+ initializeSeedsRandomly(trainTestSplit.getFirst());
+ }
+ // Do k-means iterations with trimmed mean computation (aka ball k-means).
+ if (numRuns > 1) {
+ // If the clustering is successful (there are no zero-weight centroids).
+ iterativeAssignment(trainTestSplit.getFirst());
+ // Compute the cost of the clustering and possibly save the centroids.
+ cost = ClusteringUtils.totalClusterCost(
+ splitTrainTest ? datapoints : trainTestSplit.getSecond(), centroids);
+ if (cost < bestCost) {
+ bestCost = cost;
+ bestCentroids.clear();
+ Iterables.addAll(bestCentroids, centroids);
+ }
+ } else {
+ // If there is only going to be one run, the cost doesn't need to be computed, so we just return the clustering.
+ iterativeAssignment(datapoints);
+ return centroids;
+ }
+ }
+ if (bestCost == Double.POSITIVE_INFINITY) {
+ throw new RuntimeException("No valid clustering was found");
+ }
+ if (cost != bestCost) {
+ centroids.clear();
+ centroids.addAll(bestCentroids);
+ }
+ if (correctWeights) {
+ for (WeightedVector testDatapoint : trainTestSplit.getSecond()) {
+ WeightedVector closest = (WeightedVector) centroids.searchFirst(testDatapoint, false).getValue();
+ closest.setWeight(closest.getWeight() + testDatapoint.getWeight());
+ }
+ }
+ return centroids;
+ }
+
+ /**
+ * Selects some of the original points randomly with probability proportional to their weights. This is much
+ * less sophisticated than the kmeans++ approach, however it is faster and coupled with
+ *
+ * The side effect of this method is to fill the centroids structure itself.
+ *
+ * @param datapoints The datapoints to select from. These datapoints should be WeightedVectors of some kind.
+ */
+ private void initializeSeedsRandomly(List<? extends WeightedVector> datapoints) {
+ int numDatapoints = datapoints.size();
+ double totalWeight = 0;
+ for (WeightedVector datapoint : datapoints) {
+ totalWeight += datapoint.getWeight();
+ }
+ Multinomial<Integer> seedSelector = new Multinomial<>();
+ for (int i = 0; i < numDatapoints; ++i) {
+ seedSelector.add(i, datapoints.get(i).getWeight() / totalWeight);
+ }
+ for (int i = 0; i < numClusters; ++i) {
+ int sample = seedSelector.sample();
+ seedSelector.delete(sample);
+ Centroid centroid = new Centroid(datapoints.get(sample));
+ centroid.setIndex(i);
+ centroids.add(centroid);
+ }
+ }
+
+ /**
+ * Selects some of the original points according to the k-means++ algorithm. The basic idea is that
+ * points are selected with probability proportional to their distance from any selected point. In
+ * this version, points have weights which multiply their likelihood of being selected. This is the
+ * same as if there were as many copies of the same point as indicated by the weight.
+ *
+ * This is pretty expensive, but it vastly improves the quality and convergences of the k-means algorithm.
+ * The basic idea can be made much faster by only processing a random subset of the original points.
+ * In the context of streaming k-means, the total number of possible seeds will be about k log n so this
+ * selection will cost O(k^2 (log n)^2) which isn't much worse than the random sampling idea. At
+ * n = 10^9, the cost of this initialization will be about 10x worse than a reasonable random sampling
+ * implementation.
+ *
+ * The side effect of this method is to fill the centroids structure itself.
+ *
+ * @param datapoints The datapoints to select from. These datapoints should be WeightedVectors of some kind.
+ */
+ private void initializeSeedsKMeansPlusPlus(List<? extends WeightedVector> datapoints) {
+ Preconditions.checkArgument(datapoints.size() > 1, "Must have at least two datapoints points to cluster " +
+ "sensibly");
+ Preconditions.checkArgument(datapoints.size() >= numClusters,
+ String.format("Must have more datapoints [%d] than clusters [%d]", datapoints.size(), numClusters));
+ // Compute the centroid of all of the datapoints. This is then used to compute the squared radius of the datapoints.
+ Centroid center = new Centroid(datapoints.iterator().next());
+ for (WeightedVector row : Iterables.skip(datapoints, 1)) {
+ center.update(row);
+ }
+
+ // Given the centroid, we can compute \Delta_1^2(X), the total squared distance for the datapoints
+ // this accelerates seed selection.
+ double deltaX = 0;
+ DistanceMeasure distanceMeasure = centroids.getDistanceMeasure();
+ for (WeightedVector row : datapoints) {
+ deltaX += distanceMeasure.distance(row, center);
+ }
+
+ // Find the first seed c_1 (and conceptually the second, c_2) as might be done in the 2-means clustering so that
+ // the probability of selecting c_1 and c_2 is proportional to || c_1 - c_2 ||^2. This is done
+ // by first selecting c_1 with probability:
+ //
+ // p(c_1) = sum_{c_1} || c_1 - c_2 ||^2 \over sum_{c_1, c_2} || c_1 - c_2 ||^2
+ //
+ // This can be simplified to:
+ //
+ // p(c_1) = \Delta_1^2(X) + n || c_1 - c ||^2 / (2 n \Delta_1^2(X))
+ //
+ // where c = \sum x / n and \Delta_1^2(X) = sum || x - c ||^2
+ //
+ // All subsequent seeds c_i (including c_2) can then be selected from the remaining points with probability
+ // proportional to Pr(c_i == x_j) = min_{m < i} || c_m - x_j ||^2.
+
+ // Multinomial distribution of vector indices for the selection seeds. These correspond to
+ // the indices of the vectors in the original datapoints list.
+ Multinomial<Integer> seedSelector = new Multinomial<>();
+ for (int i = 0; i < datapoints.size(); ++i) {
+ double selectionProbability =
+ deltaX + datapoints.size() * distanceMeasure.distance(datapoints.get(i), center);
+ seedSelector.add(i, selectionProbability);
+ }
+
+ int selected = random.nextInt(datapoints.size());
+ Centroid c_1 = new Centroid(datapoints.get(selected).clone());
+ c_1.setIndex(0);
+ // Construct a set of weighted things which can be used for random selection. Initial weights are
+ // set to the squared distance from c_1
+ for (int i = 0; i < datapoints.size(); ++i) {
+ WeightedVector row = datapoints.get(i);
+ double w = distanceMeasure.distance(c_1, row) * 2 * Math.log(1 + row.getWeight());
+ seedSelector.set(i, w);
+ }
+
+ // From here, seeds are selected with probability proportional to:
+ //
+ // r_i = min_{c_j} || x_i - c_j ||^2
+ //
+ // when we only have c_1, we have already set these distances and as we select each new
+ // seed, we update the minimum distances.
+ centroids.add(c_1);
+ int clusterIndex = 1;
+ while (centroids.size() < numClusters) {
+ // Select according to weights.
+ int seedIndex = seedSelector.sample();
+ Centroid nextSeed = new Centroid(datapoints.get(seedIndex));
+ nextSeed.setIndex(clusterIndex++);
+ centroids.add(nextSeed);
+ // Don't select this one again.
+ seedSelector.delete(seedIndex);
+ // Re-weight everything according to the minimum distance to a seed.
+ for (int currSeedIndex : seedSelector) {
+ WeightedVector curr = datapoints.get(currSeedIndex);
+ double newWeight = nextSeed.getWeight() * distanceMeasure.distance(nextSeed, curr);
+ if (newWeight < seedSelector.getWeight(currSeedIndex)) {
+ seedSelector.set(currSeedIndex, newWeight);
+ }
+ }
+ }
+ }
+
+ /**
+ * Examines the datapoints and updates cluster centers to be the centroid of the nearest datapoints points. To
+ * compute a new center for cluster c_i, we average all points that are closer than d_i * trimFraction
+ * where d_i is
+ *
+ * d_i = min_j \sqrt ||c_j - c_i||^2
+ *
+ * By ignoring distant points, the centroids converge more quickly to a good approximation of the
+ * optimal k-means solution (given good starting points).
+ *
+ * @param datapoints the points to cluster.
+ */
+ private void iterativeAssignment(List<? extends WeightedVector> datapoints) {
+ DistanceMeasure distanceMeasure = centroids.getDistanceMeasure();
+ // closestClusterDistances.get(i) is the distance from the i'th cluster to its closest
+ // neighboring cluster.
+ List<Double> closestClusterDistances = new ArrayList<>(numClusters);
+ // clusterAssignments[i] == j means that the i'th point is assigned to the j'th cluster. When
+ // these don't change, we are done.
+ // Each point is assigned to the invalid "-1" cluster initially.
+ List<Integer> clusterAssignments = new ArrayList<>(Collections.nCopies(datapoints.size(), -1));
+
+ boolean changed = true;
+ for (int i = 0; changed && i < maxNumIterations; i++) {
+ changed = false;
+ // We compute what the distance between each cluster and its closest neighbor is to set a
+ // proportional distance threshold for points that should be involved in calculating the
+ // centroid.
+ closestClusterDistances.clear();
+ for (Vector center : centroids) {
+ // If a centroid has no points assigned to it, the clustering failed.
+ Vector closestOtherCluster = centroids.searchFirst(center, true).getValue();
+ closestClusterDistances.add(distanceMeasure.distance(center, closestOtherCluster));
+ }
+
+ // Copies the current cluster centroids to newClusters and sets their weights to 0. This is
+ // so we calculate the new centroids as we go through the datapoints.
+ List<Centroid> newCentroids = new ArrayList<>();
+ for (Vector centroid : centroids) {
+ // need a deep copy because we will mutate these values
+ Centroid newCentroid = (Centroid)centroid.clone();
+ newCentroid.setWeight(0);
+ newCentroids.add(newCentroid);
+ }
+
+ // Pass over the datapoints computing new centroids.
+ for (int j = 0; j < datapoints.size(); ++j) {
+ WeightedVector datapoint = datapoints.get(j);
+ // Get the closest cluster this point belongs to.
+ WeightedThing<Vector> closestPair = centroids.searchFirst(datapoint, false);
+ int closestIndex = ((WeightedVector) closestPair.getValue()).getIndex();
+ double closestDistance = closestPair.getWeight();
+ // Update its cluster assignment if necessary.
+ if (closestIndex != clusterAssignments.get(j)) {
+ changed = true;
+ clusterAssignments.set(j, closestIndex);
+ }
+ // Only update if the datapoints point is near enough. What this means is that the weight
+ // of outliers is NOT taken into account and the final weights of the centroids will
+ // reflect this (it will be less or equal to the initial sum of the weights).
+ if (closestDistance < trimFraction * closestClusterDistances.get(closestIndex)) {
+ newCentroids.get(closestIndex).update(datapoint);
+ }
+ }
+ // Add the new centers back into searcher.
+ centroids.clear();
+ centroids.addAll(newCentroids);
+ }
+
+ if (correctWeights) {
+ for (Vector v : centroids) {
+ ((Centroid)v).setWeight(0);
+ }
+ for (WeightedVector datapoint : datapoints) {
+ Centroid closestCentroid = (Centroid) centroids.searchFirst(datapoint, false).getValue();
+ closestCentroid.setWeight(closestCentroid.getWeight() + datapoint.getWeight());
+ }
+ }
+ }
+
+ @Override
+ public Iterator<Centroid> iterator() {
+ return Iterators.transform(centroids.iterator(), new Function<Vector, Centroid>() {
+ @Override
+ public Centroid apply(Vector input) {
+ Preconditions.checkArgument(input instanceof Centroid, "Non-centroid in centroids " +
+ "searcher");
+ //noinspection ConstantConditions
+ return (Centroid)input;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
new file mode 100644
index 0000000..604bc9d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.jet.math.Constants;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.apache.mahout.math.random.WeightedThing;
+
+/**
+ * Implements a streaming k-means algorithm for weighted vectors.
+ * The goal clustering points one at a time, especially useful for MapReduce mappers that get inputs one at a time.
+ *
+ * A rough description of the algorithm:
+ * Suppose there are l clusters at one point and a new point p is added.
+ * The new point can either be added to one of the existing l clusters or become a new cluster. To decide:
+ * - let c be the closest cluster to point p;
+ * - let d be the distance between c and p;
+ * - if d > distanceCutoff, create a new cluster from p (p is too far away from the clusters to be part of them;
+ * distanceCutoff represents the largest distance from a point its assigned cluster's centroid);
+ * - else (d <= distanceCutoff), create a new cluster with probability d / distanceCutoff (the probability of creating
+ * a new cluster increases as d increases).
+ * There will be either l points or l + 1 points after processing a new point.
+ *
+ * As the number of clusters increases, it will go over the numClusters limit (numClusters represents a recommendation
+ * for the number of clusters that there should be at the end). To decrease the number of clusters the existing clusters
+ * are treated as data points and are re-clustered (collapsed). This tends to make the number of clusters go down.
+ * If the number of clusters is still too high, distanceCutoff is increased.
+ *
+ * For more details, see:
+ * - "Streaming k-means approximation" by N. Ailon, R. Jaiswal, C. Monteleoni
+ * http://books.nips.cc/papers/files/nips22/NIPS2009_1085.pdf
+ * - "Fast and Accurate k-means for Large Datasets" by M. Shindler, A. Wong, A. Meyerson,
+ * http://books.nips.cc/papers/files/nips24/NIPS2011_1271.pdf
+ */
+public class StreamingKMeans implements Iterable<Centroid> {
+ /**
+ * The searcher containing the centroids that resulted from the clustering of points until now. When adding a new
+ * point we either assign it to one of the existing clusters in this searcher or create a new centroid for it.
+ */
+ private final UpdatableSearcher centroids;
+
+ /**
+ * The estimated number of clusters to cluster the data in. If the actual number of clusters increases beyond this
+ * limit, the clusters will be "collapsed" (re-clustered, by treating them as data points). This doesn't happen
+ * recursively and a collapse might not necessarily make the number of actual clusters drop to less than this limit.
+ *
+ * If the goal is clustering a large data set into k clusters, numClusters SHOULD NOT BE SET to k. StreamingKMeans is
+ * useful to reduce the size of the data set by the mappers so that it can fit into memory in one reducer that runs
+ * BallKMeans.
+ *
+ * It is NOT MEANT to cluster the data into k clusters in one pass because it can't guarantee that there will in fact
+ * be k clusters in total. This is because of the dynamic nature of numClusters over the course of the runtime.
+ * To get an exact number of clusters, another clustering algorithm needs to be applied to the results.
+ */
+ private int numClusters;
+
+ /**
+ * The number of data points seen so far. This is important for re-estimating numClusters when deciding to collapse
+ * the existing clusters.
+ */
+ private int numProcessedDatapoints = 0;
+
+ /**
+ * This is the current value of the distance cutoff. Points which are much closer than this to a centroid will stick
+ * to it almost certainly. Points further than this to any centroid will form a new cluster.
+ *
+ * This increases (is multiplied by beta) when a cluster collapse did not make the number of clusters drop to below
+ * numClusters (it effectively increases the tolerance for cluster compactness discouraging the creation of new
+ * clusters). Since a collapse only happens when centroids.size() > clusterOvershoot * numClusters, the cutoff
+ * increases when the collapse didn't at least remove the slack in the number of clusters.
+ */
+ private double distanceCutoff;
+
+ /**
+ * Parameter that controls the growth of the distanceCutoff. After n increases of the
+ * distanceCutoff starting at d_0, the final value is d_0 * beta^n (distance cutoffs increase following a geometric
+ * progression with ratio beta).
+ */
+ private final double beta;
+
+ /**
+ * Multiplying clusterLogFactor with numProcessedDatapoints gets an estimate of the suggested
+ * number of clusters. This mirrors the recommended number of clusters for n points where there should be k actual
+ * clusters, k * log n. In the case of our estimate we use clusterLogFactor * log(numProcessedDataPoints).
+ *
+ * It is important to note that numClusters is NOT k. It is an estimate of k * log n.
+ */
+ private final double clusterLogFactor;
+
+ /**
+ * Centroids are collapsed when the number of clusters becomes greater than clusterOvershoot * numClusters. This
+ * effectively means having a slack in numClusters so that the actual number of centroids, centroids.size() tracks
+ * numClusters approximately. The idea is that the actual number of clusters should be at least numClusters but not
+ * much more (so that we don't end up having 1 cluster / point).
+ */
+ private final double clusterOvershoot;
+
+ /**
+ * Random object to sample values from.
+ */
+ private final Random random = RandomUtils.getRandom();
+
+ /**
+ * Calls StreamingKMeans(searcher, numClusters, 1.3, 10, 2).
+ * @see StreamingKMeans#StreamingKMeans(org.apache.mahout.math.neighborhood.UpdatableSearcher, int,
+ * double, double, double, double)
+ */
+ public StreamingKMeans(UpdatableSearcher searcher, int numClusters) {
+ this(searcher, numClusters, 1.0 / numClusters, 1.3, 20, 2);
+ }
+
+ /**
+ * Calls StreamingKMeans(searcher, numClusters, distanceCutoff, 1.3, 10, 2).
+ * @see StreamingKMeans#StreamingKMeans(org.apache.mahout.math.neighborhood.UpdatableSearcher, int,
+ * double, double, double, double)
+ */
+ public StreamingKMeans(UpdatableSearcher searcher, int numClusters, double distanceCutoff) {
+ this(searcher, numClusters, distanceCutoff, 1.3, 20, 2);
+ }
+
+ /**
+ * Creates a new StreamingKMeans class given a searcher and the number of clusters to generate.
+ *
+ * @param searcher A Searcher that is used for performing nearest neighbor search. It MUST BE
+ * EMPTY initially because it will be used to keep track of the cluster
+ * centroids.
+ * @param numClusters An estimated number of clusters to generate for the data points.
+ * This can adjusted, but the actual number will depend on the data. The
+ * @param distanceCutoff The initial distance cutoff representing the value of the
+ * distance between a point and its closest centroid after which
+ * the new point will definitely be assigned to a new cluster.
+ * @param beta Ratio of geometric progression to use when increasing distanceCutoff. After n increases, distanceCutoff
+ * becomes distanceCutoff * beta^n. A smaller value increases the distanceCutoff less aggressively.
+ * @param clusterLogFactor Value multiplied with the number of points counted so far estimating the number of clusters
+ * to aim for. If the final number of clusters is known and this clustering is only for a
+ * sketch of the data, this can be the final number of clusters, k.
+ * @param clusterOvershoot Multiplicative slack factor for slowing down the collapse of the clusters.
+ */
+ public StreamingKMeans(UpdatableSearcher searcher, int numClusters,
+ double distanceCutoff, double beta, double clusterLogFactor, double clusterOvershoot) {
+ this.centroids = searcher;
+ this.numClusters = numClusters;
+ this.distanceCutoff = distanceCutoff;
+ this.beta = beta;
+ this.clusterLogFactor = clusterLogFactor;
+ this.clusterOvershoot = clusterOvershoot;
+ }
+
+ /**
+ * @return an Iterator to the Centroids contained in this clusterer.
+ */
+ @Override
+ public Iterator<Centroid> iterator() {
+ return Iterators.transform(centroids.iterator(), new Function<Vector, Centroid>() {
+ @Override
+ public Centroid apply(Vector input) {
+ return (Centroid)input;
+ }
+ });
+ }
+
+ /**
+ * Cluster the rows of a matrix, treating them as Centroids with weight 1.
+ * @param data matrix whose rows are to be clustered.
+ * @return the UpdatableSearcher containing the resulting centroids.
+ */
+ public UpdatableSearcher cluster(Matrix data) {
+ return cluster(Iterables.transform(data, new Function<MatrixSlice, Centroid>() {
+ @Override
+ public Centroid apply(MatrixSlice input) {
+ // The key in a Centroid is actually the MatrixSlice's index.
+ return Centroid.create(input.index(), input.vector());
+ }
+ }));
+ }
+
+ /**
+ * Cluster the data points in an Iterable<Centroid>.
+ * @param datapoints Iterable whose elements are to be clustered.
+ * @return the UpdatableSearcher containing the resulting centroids.
+ */
+ public UpdatableSearcher cluster(Iterable<Centroid> datapoints) {
+ return clusterInternal(datapoints, false);
+ }
+
+ /**
+ * Cluster one data point.
+ * @param datapoint to be clustered.
+ * @return the UpdatableSearcher containing the resulting centroids.
+ */
+ public UpdatableSearcher cluster(final Centroid datapoint) {
+ return cluster(new Iterable<Centroid>() {
+ @Override
+ public Iterator<Centroid> iterator() {
+ return new Iterator<Centroid>() {
+ private boolean accessed = false;
+
+ @Override
+ public boolean hasNext() {
+ return !accessed;
+ }
+
+ @Override
+ public Centroid next() {
+ accessed = true;
+ return datapoint;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ });
+ }
+
+ /**
+ * @return the number of clusters computed from the points until now.
+ */
+ public int getNumClusters() {
+ return centroids.size();
+ }
+
+ /**
+ * Internal clustering method that gets called from the other wrappers.
+ * @param datapoints Iterable of data points to be clustered.
+ * @param collapseClusters whether this is an "inner" clustering and the datapoints are the previously computed
+ * centroids. Some logic is different to ensure counters are consistent but it behaves
+ * nearly the same.
+ * @return the UpdatableSearcher containing the resulting centroids.
+ */
+ private UpdatableSearcher clusterInternal(Iterable<Centroid> datapoints, boolean collapseClusters) {
+ Iterator<Centroid> datapointsIterator = datapoints.iterator();
+ if (!datapointsIterator.hasNext()) {
+ return centroids;
+ }
+
+ int oldNumProcessedDataPoints = numProcessedDatapoints;
+ // We clear the centroids we have in case of cluster collapse, the old clusters are the
+ // datapoints but we need to re-cluster them.
+ if (collapseClusters) {
+ centroids.clear();
+ numProcessedDatapoints = 0;
+ }
+
+ if (centroids.size() == 0) {
+ // Assign the first datapoint to the first cluster.
+ // Adding a vector to a searcher would normally just reference the copy,
+ // but we could potentially mutate it and so we need to make a clone.
+ centroids.add(datapointsIterator.next().clone());
+ ++numProcessedDatapoints;
+ }
+
+ // To cluster, we scan the data and either add each point to the nearest group or create a new group.
+ // when we get too many groups, we need to increase the threshold and rescan our current groups
+ while (datapointsIterator.hasNext()) {
+ Centroid row = datapointsIterator.next();
+ // Get the closest vector and its weight as a WeightedThing<Vector>.
+ // The weight of the WeightedThing is the distance to the query and the value is a
+ // reference to one of the vectors we added to the searcher previously.
+ WeightedThing<Vector> closestPair = centroids.searchFirst(row, false);
+
+ // We get a uniformly distributed random number between 0 and 1 and compare it with the
+ // distance to the closest cluster divided by the distanceCutoff.
+ // This is so that if the closest cluster is further than distanceCutoff,
+ // closestPair.getWeight() / distanceCutoff > 1 which will trigger the creation of a new
+ // cluster anyway.
+ // However, if the ratio is less than 1, we want to create a new cluster with probability
+ // proportional to the distance to the closest cluster.
+ double sample = random.nextDouble();
+ if (sample < row.getWeight() * closestPair.getWeight() / distanceCutoff) {
+ // Add new centroid, note that the vector is copied because we may mutate it later.
+ centroids.add(row.clone());
+ } else {
+ // Merge the new point with the existing centroid. This will update the centroid's actual
+ // position.
+ // We know that all the points we inserted in the centroids searcher are (or extend)
+ // WeightedVector, so the cast will always succeed.
+ Centroid centroid = (Centroid) closestPair.getValue();
+
+ // We will update the centroid by removing it from the searcher and reinserting it to
+ // ensure consistency.
+ if (!centroids.remove(centroid, Constants.EPSILON)) {
+ throw new RuntimeException("Unable to remove centroid");
+ }
+ centroid.update(row);
+ centroids.add(centroid);
+
+ }
+ ++numProcessedDatapoints;
+
+ if (!collapseClusters && centroids.size() > clusterOvershoot * numClusters) {
+ numClusters = (int) Math.max(numClusters, clusterLogFactor * Math.log(numProcessedDatapoints));
+
+ List<Centroid> shuffled = new ArrayList<>();
+ for (Vector vector : centroids) {
+ shuffled.add((Centroid) vector);
+ }
+ Collections.shuffle(shuffled);
+ // Re-cluster using the shuffled centroids as data points. The centroids member variable
+ // is modified directly.
+ clusterInternal(shuffled, true);
+
+ if (centroids.size() > numClusters) {
+ distanceCutoff *= beta;
+ }
+ }
+ }
+
+ if (collapseClusters) {
+ numProcessedDatapoints = oldNumProcessedDataPoints;
+ }
+ return centroids;
+ }
+
+ public void reindexCentroids() {
+ int numCentroids = 0;
+ for (Centroid centroid : this) {
+ centroid.setIndex(numCentroids++);
+ }
+ }
+
+ /**
+ * @return the distanceCutoff (an upper bound for the maximum distance within a cluster).
+ */
+ public double getDistanceCutoff() {
+ return distanceCutoff;
+ }
+
+ public void setDistanceCutoff(double distanceCutoff) {
+ this.distanceCutoff = distanceCutoff;
+ }
+
+ public DistanceMeasure getDistanceMeasure() {
+ return centroids.getDistanceMeasure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/CentroidWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/CentroidWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/CentroidWritable.java
new file mode 100644
index 0000000..a41940b
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/CentroidWritable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class CentroidWritable implements Writable {
+ private Centroid centroid = null;
+
+ public CentroidWritable() {}
+
+ public CentroidWritable(Centroid centroid) {
+ this.centroid = centroid;
+ }
+
+ public Centroid getCentroid() {
+ return centroid;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeInt(centroid.getIndex());
+ dataOutput.writeDouble(centroid.getWeight());
+ VectorWritable.writeVector(dataOutput, centroid.getVector());
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ if (centroid == null) {
+ centroid = read(dataInput);
+ return;
+ }
+ centroid.setIndex(dataInput.readInt());
+ centroid.setWeight(dataInput.readDouble());
+ centroid.assign(VectorWritable.readVector(dataInput));
+ }
+
+ public static Centroid read(DataInput dataInput) throws IOException {
+ int index = dataInput.readInt();
+ double weight = dataInput.readDouble();
+ Vector v = VectorWritable.readVector(dataInput);
+ return new Centroid(index, v, weight);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CentroidWritable)) {
+ return false;
+ }
+ CentroidWritable writable = (CentroidWritable) o;
+ return centroid.equals(writable.centroid);
+ }
+
+ @Override
+ public int hashCode() {
+ return centroid.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return centroid.toString();
+ }
+}