You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2012/02/12 21:54:26 UTC

svn commit: r1243326 - /mahout/trunk/core/src/main/java/org/apache/mahout/clustering/

Author: jeastman
Date: Sun Feb 12 20:54:26 2012
New Revision: 1243326

URL: http://svn.apache.org/viewvc?rev=1243326&view=rev
Log:
MAHOUT-933: 
- Implemented ClusteringPolicyWritable and made ClusteringPolicies implement Writable so they can be written to the file system. 
- Modified ClusterIterator and CIMapper to write and read the appropriate clustering policy. 

Next steps need to address the fact that clusters do not serialize their observation state (s0, s1, s2) and so the MR version of ClusterIterator does not actually produce correct values. This will be a much bigger project.

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicyWritable.java   (with props)
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/CIMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterIterator.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicy.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/DirichletClusteringPolicy.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/FuzzyKMeansClusteringPolicy.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/KMeansClusteringPolicy.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/CIMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/CIMapper.java?rev=1243326&r1=1243325&r2=1243326&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/CIMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/CIMapper.java Sun Feb 12 20:54:26 2012
@@ -28,8 +28,9 @@ public class CIMapper extends Mapper<Wri
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     String priorClustersPath = context.getConfiguration().get(ClusterIterator.PRIOR_PATH_KEY);
+    String policyPath = context.getConfiguration().get(ClusterIterator.POLICY_PATH_KEY);
     classifier = ClusterIterator.readClassifier(new Path(priorClustersPath));
-    policy = new KMeansClusteringPolicy();
+    policy = ClusterIterator.readPolicy(new Path(policyPath));
     super.setup(context);
   }
   

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterIterator.java?rev=1243326&r1=1243325&r2=1243326&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterIterator.java Sun Feb 12 20:54:26 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -54,6 +55,7 @@ import com.google.common.io.Closeables;
 public class ClusterIterator {
   
   public static final String PRIOR_PATH_KEY = "org.apache.mahout.clustering.prior.path";
+  public static final String POLICY_PATH_KEY = "org.apache.mahout.clustering.policy.path";
   
   public ClusterIterator(ClusteringPolicy policy) {
     this.policy = policy;
@@ -151,6 +153,9 @@ public class ClusterIterator {
       InterruptedException, ClassNotFoundException {
     Configuration conf = new Configuration();
     HadoopUtil.delete(conf, outPath);
+    Path policyPath = new Path(outPath, "policy.seq");
+    writePolicy(policy, policyPath);
+    conf.set(POLICY_PATH_KEY, policyPath.toString());
     for (int iteration = 1; iteration <= numIterations; iteration++) {
       conf.set(PRIOR_PATH_KEY, priorPath.toString());
       
@@ -237,4 +242,23 @@ public class ClusterIterator {
     ClusterClassifier classifierOut = new ClusterClassifier(clusters);
     return classifierOut;
   }
+  
+  public static ClusteringPolicy readPolicy(Path policyPath) throws IOException {
+    Configuration config = new Configuration();
+    FileSystem fs = FileSystem.get(policyPath.toUri(), config);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, policyPath, config);
+    Text key = new Text();
+    ClusteringPolicyWritable cpw = new ClusteringPolicyWritable();
+    reader.next(key, cpw);
+    return cpw.getValue();
+  }
+  
+  public static void writePolicy(ClusteringPolicy policy, Path policyPath) throws IOException {
+    Configuration config = new Configuration();
+    FileSystem fs = FileSystem.get(policyPath.toUri(), config);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, config, policyPath, Text.class,
+        ClusteringPolicyWritable.class);
+    writer.append(new Text(), new ClusteringPolicyWritable(policy));
+    writer.close();
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicy.java?rev=1243326&r1=1243325&r2=1243326&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicy.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicy.java Sun Feb 12 20:54:26 2012
@@ -16,13 +16,14 @@
  */
 package org.apache.mahout.clustering;
 
+import org.apache.hadoop.io.Writable;
 import org.apache.mahout.math.Vector;
 
 /**
  * A ClusteringPolicy captures the semantics of assignment of points to clusters
  * 
  */
-public interface ClusteringPolicy {
+public interface ClusteringPolicy extends Writable{
   
   /**
    * Return the index of the most appropriate model

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicyWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicyWritable.java?rev=1243326&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicyWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicyWritable.java Sun Feb 12 20:54:26 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.sgd.PolymorphicWritable;
+
+public class ClusteringPolicyWritable implements Writable {
+  
+  private ClusteringPolicy value;
+  
+  public ClusteringPolicyWritable(ClusteringPolicy policy) {
+    this.value = policy;
+  }
+
+  public ClusteringPolicyWritable() {
+  }
+
+  public ClusteringPolicy getValue() {
+    return value;
+  }
+  
+  public void setValue(ClusteringPolicy value) {
+    this.value = value;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    PolymorphicWritable.write(out, value);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    value = PolymorphicWritable.read(in, ClusteringPolicy.class);
+  }
+  
+}

Propchange: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusteringPolicyWritable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/DirichletClusteringPolicy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/DirichletClusteringPolicy.java?rev=1243326&r1=1243325&r2=1243326&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/DirichletClusteringPolicy.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/DirichletClusteringPolicy.java Sun Feb 12 20:54:26 2012
@@ -16,13 +16,22 @@
  */
 package org.apache.mahout.clustering;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.mahout.clustering.dirichlet.UncommonDistributions;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 
 public class DirichletClusteringPolicy implements ClusteringPolicy {
   
+  public DirichletClusteringPolicy() {
+    super();
+  }
+
   public DirichletClusteringPolicy(int k, double alpha0) {
     this.totalCounts = new DenseVector(k);
     this.alpha0 = alpha0;
@@ -34,11 +43,14 @@ public class DirichletClusteringPolicy i
   private Vector mixture;
   
   // Alpha_0 primes the Dirichlet distribution
-  private final double alpha0;
+  private double alpha0;
   
   // Total observed over all time
-  private final Vector totalCounts;
+  private Vector totalCounts;
   
+  /* (non-Javadoc)
+   * @see org.apache.mahout.clustering.ClusteringPolicy#select(org.apache.mahout.math.Vector)
+   */
   @Override
   public Vector select(Vector probabilities) {
     int rMultinom = UncommonDistributions.rMultinom(probabilities.times(mixture));
@@ -48,6 +60,9 @@ public class DirichletClusteringPolicy i
   }
   
   // update the total counts and then the mixture
+  /* (non-Javadoc)
+   * @see org.apache.mahout.clustering.ClusteringPolicy#update(org.apache.mahout.clustering.ClusterClassifier)
+   */
   @Override
   public void update(ClusterClassifier prior) {
     for (int i = 0; i < totalCounts.size(); i++) {
@@ -56,4 +71,24 @@ public class DirichletClusteringPolicy i
     }
     mixture = UncommonDistributions.rDirichlet(totalCounts, alpha0);
   }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(alpha0);
+    VectorWritable.writeVector(out, totalCounts);
+    VectorWritable.writeVector(out, mixture);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.alpha0 = in.readDouble();
+    this.totalCounts = VectorWritable.readVector(in);
+    this.mixture = VectorWritable.readVector(in);
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/FuzzyKMeansClusteringPolicy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/FuzzyKMeansClusteringPolicy.java?rev=1243326&r1=1243325&r2=1243326&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/FuzzyKMeansClusteringPolicy.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/FuzzyKMeansClusteringPolicy.java Sun Feb 12 20:54:26 2012
@@ -16,6 +16,10 @@
  */
 package org.apache.mahout.clustering;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.mahout.math.Vector;
 
 /**
@@ -25,6 +29,7 @@ import org.apache.mahout.math.Vector;
  */
 public class FuzzyKMeansClusteringPolicy implements ClusteringPolicy {
     
+  private double m;
   /*
    * (non-Javadoc)
    * 
@@ -44,5 +49,21 @@ public class FuzzyKMeansClusteringPolicy
   public Vector select(Vector probabilities) {
     return probabilities;
   }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(m);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.m = in.readDouble();
+  }
   
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/KMeansClusteringPolicy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/KMeansClusteringPolicy.java?rev=1243326&r1=1243325&r2=1243326&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/KMeansClusteringPolicy.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/KMeansClusteringPolicy.java Sun Feb 12 20:54:26 2012
@@ -16,6 +16,10 @@
  */
 package org.apache.mahout.clustering;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.mahout.math.SequentialAccessSparseVector;
 import org.apache.mahout.math.Vector;
 
@@ -44,5 +48,21 @@ public class KMeansClusteringPolicy impl
   public void update(ClusterClassifier posterior) {
     // nothing to do here
   }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // nothing to do here
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // nothing to do here
+  }
   
 }