You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/10/01 09:40:53 UTC

svn commit: r1003415 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/minhash/ core/src/main/java/org/apache/mahout/common/commandline/ core/src/test/java/org/apache/mahout/clustering/minhash/ examples/src/main/java/org/apache/mahout...

Author: srowen
Date: Fri Oct  1 07:40:53 2010
New Revision: 1003415

URL: http://svn.apache.org/viewvc?rev=1003415&view=rev
Log:
MAHOUT-344

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.vectors.MurmurHash;
+
+import java.util.Random;
+
+public class HashFactory {
+
+  private HashFactory() {
+  }
+
+  public enum HashType {
+    LINEAR, POLYNOMIAL, MURMUR
+  }
+
+  public static HashFunction[] createHashFunctions(HashType type, int numFunctions) {
+    HashFunction[] hashFunction = new HashFunction[numFunctions];
+    Random seed = new Random(11);
+    switch (type) {
+      case LINEAR:
+        for (int i = 0; i < numFunctions; i++) {
+          hashFunction[i] = new LinearHash(seed.nextInt(), seed.nextInt());
+        }
+        break;
+      case POLYNOMIAL:
+        for (int i = 0; i < numFunctions; i++) {
+          hashFunction[i] = new PolynomialHash(seed.nextInt(), seed.nextInt(), seed.nextInt());
+        }
+        break;
+      case MURMUR:
+        for (int i = 0; i < numFunctions; i++) {
+          hashFunction[i] = new MurmurHashWrapper(seed.nextInt());
+        }
+        break;
+    }
+    return hashFunction;
+  }
+
+  static class LinearHash implements HashFunction {
+    private final int seedA;
+    private final int seedB;
+
+    LinearHash(int seedA, int seedB) {
+      this.seedA = seedA;
+      this.seedB = seedB;
+    }
+
+    @Override
+    public int hash(byte[] bytes) {
+      long hashValue = 31;
+      for (byte byteVal : bytes) {
+        hashValue *= seedA * byteVal;
+        hashValue += seedB;
+      }
+      return Math.abs((int) (hashValue % RandomUtils.MAX_INT_SMALLER_TWIN_PRIME));
+    }
+  }
+
+  static class PolynomialHash implements HashFunction {
+    private final int seedA;
+    private final int seedB;
+    private final int seedC;
+
+    PolynomialHash(int seedA, int seedB, int seedC) {
+      this.seedA = seedA;
+      this.seedB = seedB;
+      this.seedC = seedC;
+    }
+
+    @Override
+    public int hash(byte[] bytes) {
+      long hashValue = 31;
+      for (byte byteVal : bytes) {
+        hashValue *= seedA * (byteVal >> 4);
+        hashValue += seedB * byteVal + seedC;
+      }
+      return Math
+          .abs((int) (hashValue % RandomUtils.MAX_INT_SMALLER_TWIN_PRIME));
+    }
+  }
+
+  static class MurmurHashWrapper implements HashFunction {
+    private final int seed;
+
+    MurmurHashWrapper(int seed) {
+      this.seed = seed;
+    }
+
+    @Override
+    public int hash(byte[] bytes) {
+      long hashValue = MurmurHash.hash64A(bytes, seed);
+      return Math.abs((int) (hashValue % RandomUtils.MAX_INT_SMALLER_TWIN_PRIME));
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.minhash;
+
+public interface HashFunction {
+
+  int hash(byte[] bytes);
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public final class MinHashDriver extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+  }
+
+  private void runJob(Path input, 
+                      Path output,
+                      int minClusterSize,
+                      int minVectorSize, 
+                      String hashType, 
+                      int numHashFunctions, 
+                      int keyGroups,
+                      int numReduceTasks, 
+                      boolean debugOutput) throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = getConf();
+
+    conf.setInt(MinhashOptionCreator.MIN_CLUSTER_SIZE, minClusterSize);
+    conf.setInt(MinhashOptionCreator.MIN_VECTOR_SIZE, minVectorSize);
+    conf.set(MinhashOptionCreator.HASH_TYPE, hashType);
+    conf.setInt(MinhashOptionCreator.NUM_HASH_FUNCTIONS, numHashFunctions);
+    conf.setInt(MinhashOptionCreator.KEY_GROUPS, keyGroups);
+    conf.setBoolean(MinhashOptionCreator.DEBUG_OUTPUT, debugOutput);
+
+    Class<? extends Writable> outputClass = 
+        debugOutput ? VectorWritable.class : Text.class;
+    Class<? extends OutputFormat> outputFormatClass = 
+        debugOutput ? SequenceFileOutputFormat.class : TextOutputFormat.class;
+    
+    Job job = new Job(conf, "MinHash Clustering");
+    job.setJarByClass(MinHashDriver.class);
+
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setMapperClass(MinHashMapper.class);
+    job.setReducerClass(MinHashReducer.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(outputFormatClass);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(outputClass);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(outputClass);
+
+    job.setNumReduceTasks(numReduceTasks);
+
+    job.waitForCompletion(true);
+  }
+
+  @Override
+  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
+    addInputOption();
+    addOutputOption();
+    addOption(MinhashOptionCreator.minClusterSizeOption().create());
+    addOption(MinhashOptionCreator.minVectorSizeOption().create());
+    addOption(MinhashOptionCreator.hashTypeOption().create());
+    addOption(MinhashOptionCreator.numHashFunctionsOption().create());
+    addOption(MinhashOptionCreator.keyGroupsOption().create());
+    addOption(MinhashOptionCreator.numReducersOption().create());
+    addOption(MinhashOptionCreator.debugOutputOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    int minClusterSize = Integer.valueOf(getOption(MinhashOptionCreator.MIN_CLUSTER_SIZE));
+    int minVectorSize = Integer.valueOf(getOption(MinhashOptionCreator.MIN_VECTOR_SIZE));
+    String hashType = getOption(MinhashOptionCreator.HASH_TYPE);
+    int numHashFunctions = Integer.valueOf(getOption(MinhashOptionCreator.NUM_HASH_FUNCTIONS));
+    int keyGroups = Integer.valueOf(getOption(MinhashOptionCreator.KEY_GROUPS));
+    int numReduceTasks = Integer.parseInt(getOption(MinhashOptionCreator.NUM_REDUCERS));
+    boolean debugOutput = Boolean.parseBoolean(getOption(MinhashOptionCreator.DEBUG_OUTPUT));
+
+    runJob(input,
+           output,
+           minClusterSize,
+           minVectorSize,
+           hashType,
+           numHashFunctions,
+           keyGroups,
+           numReduceTasks,
+           debugOutput);
+    return 0;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.minhash.HashFactory.HashType;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class MinHashMapper extends Mapper<Text, Writable, Text, Writable> {
+
+  private static final Logger log = LoggerFactory.getLogger(MinHashMapper.class);
+
+  private HashFunction[] hashFunction;
+  private int numHashFunctions;
+  private int keyGroups;
+  private int minVectorSize;
+  private boolean debugOutput;
+  private int[] minHashValues;
+  private byte[] bytesToHash;
+
+  @Override
+  protected void setup(Context context) throws IOException,  InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    this.numHashFunctions = conf.getInt(MinhashOptionCreator.NUM_HASH_FUNCTIONS, 10);
+    this.minHashValues = new int[numHashFunctions];
+    this.bytesToHash = new byte[4];
+    this.keyGroups = conf.getInt(MinhashOptionCreator.KEY_GROUPS, 1);
+    this.minVectorSize = conf.getInt(MinhashOptionCreator.MIN_VECTOR_SIZE, 5);
+    String htype = conf.get(MinhashOptionCreator.HASH_TYPE, "linear");
+    this.debugOutput = conf.getBoolean(MinhashOptionCreator.DEBUG_OUTPUT, false);
+
+    HashType hashType;
+    try {
+      hashType = HashType.valueOf(htype);
+    } catch (IllegalArgumentException iae) {
+      log.warn("No valid hash type found in configuration for {}, assuming type: {}", htype, HashType.LINEAR);
+      hashType = HashType.LINEAR;
+    }
+    hashFunction = HashFactory.createHashFunctions(hashType, numHashFunctions);
+  }
+
+  /**
+   * Hash all items with each function and retain min. value for each iteration.
+   * We up with X number of minhash signatures.
+   * 
+   * Now depending upon the number of key-groups (1 - 4) concatenate that many
+   * minhash values to form cluster-id as 'key' and item-id as 'value'
+   */
+  @Override
+  public void map(Text item, Writable features, Context context) throws IOException, InterruptedException {
+    Vector featureVector = ((VectorWritable) features).get();
+    if (featureVector.size() < minVectorSize) {
+      return;
+    }
+    // Initialize the minhash values to highest
+    for (int i = 0; i < numHashFunctions; i++) {
+      minHashValues[i] = Integer.MAX_VALUE;
+    }
+    int hashIndex = 0;
+    for (int i = 0; i < numHashFunctions; i++) {
+      for (Vector.Element ele : featureVector) {
+        int value = (int) ele.get();
+        bytesToHash[0] = (byte) (value >> 24);
+        bytesToHash[1] = (byte) (value >> 16);
+        bytesToHash[2] = (byte) (value >> 8);
+        bytesToHash[3] = (byte) (value);
+        hashIndex = hashFunction[i].hash(bytesToHash);
+        if (minHashValues[i] > hashIndex) {
+          minHashValues[i] = hashIndex;
+        }
+      }
+    }
+    // output the cluster information
+    for (int i = 0; i < numHashFunctions; i += keyGroups) {
+      StringBuilder clusterIdBuilder = new StringBuilder();
+      for (int j = 0; j < keyGroups && (i + j) < numHashFunctions; j++) {
+        clusterIdBuilder.append(minHashValues[i + j]).append('-');
+      }
+      String clusterId = clusterIdBuilder.toString();
+      clusterId = clusterId.substring(0, clusterId.lastIndexOf('-'));
+      Text cluster = new Text(clusterId);
+      Writable point;
+      if (debugOutput) {
+        point = new VectorWritable(featureVector.clone());
+      } else {
+        point = new Text(item.toString());
+      }
+      context.write(cluster, point);
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class MinHashReducer extends Reducer<Text, Writable, Text, Writable> {
+
+  private int minClusterSize;
+  private boolean debugOutput;
+
+  enum Clusters {
+    Accepted, Discarded
+  }
+  
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    this.minClusterSize = conf.getInt(MinhashOptionCreator.MIN_CLUSTER_SIZE, 5);
+    this.debugOutput = conf.getBoolean(MinhashOptionCreator.DEBUG_OUTPUT, false);
+  }
+
+  /**
+   * output the items clustered
+   */
+  @Override
+  protected void reduce(Text cluster, Iterable<Writable> points, Context context)
+    throws IOException, InterruptedException {
+    Collection<Writable> pointList = new ArrayList<Writable>();
+    for (Writable point : points) {
+      if (debugOutput) {
+        Vector pointVector = ((VectorWritable) point).get().clone();
+        Writable writablePointVector = new VectorWritable(pointVector);
+        pointList.add(writablePointVector);
+      } else {
+        Writable pointText = new Text(point.toString());
+        pointList.add(pointText);
+      }
+    }
+    if (pointList.size() >= minClusterSize) {
+      context.getCounter(Clusters.Accepted).increment(1);
+      for (Writable point : pointList) {
+        context.write(cluster, point);
+      }
+    } else {
+      context.getCounter(Clusters.Discarded).increment(1);
+    }
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.common.commandline;
+
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+
+public final class MinhashOptionCreator {
+
+  public static final String NUM_HASH_FUNCTIONS = "numHashFunctions";
+  public static final String KEY_GROUPS = "keyGroups";
+  public static final String HASH_TYPE = "hashType";
+  public static final String MIN_CLUSTER_SIZE = "minClusterSize";
+  public static final String MIN_VECTOR_SIZE = "minVectorSize";
+  public static final String NUM_REDUCERS = "numReducers";
+  public static final String DEBUG_OUTPUT = "debugOutput";
+
+  private MinhashOptionCreator() {
+  }
+
+  public static DefaultOptionBuilder debugOutputOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(DEBUG_OUTPUT)
+        .withShortName("debug")
+        .withArgument(
+            new ArgumentBuilder().withName(DEBUG_OUTPUT).withDefault("false")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription("Cluster the whole vectors for debugging");
+  }
+
+  public static DefaultOptionBuilder numReducersOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(NUM_REDUCERS)
+        .withRequired(false)
+        .withShortName("r")
+        .withArgument(
+            new ArgumentBuilder().withName(NUM_REDUCERS).withDefault("2")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription("The number of reduce tasks. Defaults to 2");
+  }
+
+  /**
+   * Returns a default command line option for specifying the minimum cluster
+   * size in MinHash clustering
+   */
+  public static DefaultOptionBuilder minClusterSizeOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(MIN_CLUSTER_SIZE)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(MIN_CLUSTER_SIZE).withDefault("10")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription("Minimum points inside a cluster")
+        .withShortName("mcs");
+  }
+
+  /**
+   * Returns a default command line option for specifying the type of hash to
+   * use in MinHash clustering: Should one out of
+   * ("linear","polynomial","murmur")
+   */
+  public static DefaultOptionBuilder hashTypeOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(HASH_TYPE)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(HASH_TYPE).withDefault("murmur")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "Type of hash function to use. Available types: (linear, polynomial, murmur) ")
+        .withShortName("ht");
+  }
+
+  /**
+   * Returns a default command line option for specifying the min size of the
+   * vector to hash Should one out of ("linear","polynomial","murmur")
+   */
+  public static DefaultOptionBuilder minVectorSizeOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(MIN_VECTOR_SIZE)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(MIN_VECTOR_SIZE).withDefault("5")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription("Minimum size of vector to be hashed")
+        .withShortName("mvs");
+  }
+
+  /**
+   * Returns a default command line option for specifying the number of hash
+   * functions to be used in MinHash clustering
+   */
+  public static DefaultOptionBuilder numHashFunctionsOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(NUM_HASH_FUNCTIONS)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(NUM_HASH_FUNCTIONS)
+                .withDefault("10").withMinimum(1).withMaximum(1).create())
+        .withDescription("Number of hash functions to be used")
+        .withShortName("nh");
+  }
+
+  /**
+   * Returns a default command line option for specifying the number of key
+   * groups to be used in MinHash clustering
+   */
+  public static DefaultOptionBuilder keyGroupsOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(KEY_GROUPS)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(KEY_GROUPS).withDefault("2")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription("Number of key groups to be used").withShortName("kg");
+  }
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.minhash.HashFactory.HashType;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+public class TestMinHashClustering extends MahoutTestCase {
+
+  public static final double[][] REFERENCE = { { 1, 2, 3, 4, 5 },
+      { 2, 1, 3, 6, 7 }, { 3, 7, 6, 11, 8, 9 }, { 4, 7, 8, 9, 6, 1 },
+      { 5, 8, 10, 4, 1 }, { 6, 17, 14, 15 }, { 8, 9, 11, 6, 12, 1, 7 },
+      { 10, 13, 9, 7, 4, 6, 3 }, { 3, 5, 7, 9, 2, 11 }, { 13, 7, 6, 8, 5 } };
+
+  private FileSystem fs;
+  private Path input;
+  private Path output;
+
+  public static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = new ArrayList<VectorWritable>();
+    for (double[] fr : raw) {
+      Vector vec = new SequentialAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    input = getTestTempDirPath("points");
+    output = new Path(getTestTempDirPath(), "output");
+    Path pointFile = new Path(input, "file1");
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, pointFile, Text.class, VectorWritable.class);
+    int id = 0;
+    for (VectorWritable point : points) {
+      writer.append(new Text("Id-" + id++), point);
+    }
+    writer.close();
+  }
+
+  private String[] makeArguments(int minClusterSize,
+                                 int minVectorSize,
+                                 int numHashFunctions,
+                                 int keyGroups,
+                                 String hashType){
+    return new String[] {
+        optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(MinhashOptionCreator.MIN_CLUSTER_SIZE), String.valueOf(minClusterSize),
+        optKey(MinhashOptionCreator.MIN_VECTOR_SIZE), String.valueOf(minVectorSize),
+        optKey(MinhashOptionCreator.HASH_TYPE), hashType,
+        optKey(MinhashOptionCreator.NUM_HASH_FUNCTIONS), String.valueOf(numHashFunctions),
+        optKey(MinhashOptionCreator.KEY_GROUPS), String.valueOf(keyGroups),
+        optKey(MinhashOptionCreator.NUM_REDUCERS), "1",
+        optKey(MinhashOptionCreator.DEBUG_OUTPUT), "true"};
+  }
+
+  private static Set<Integer> getValues(Vector vector) {
+    Iterator<Vector.Element> itr = vector.iterator();
+    Set<Integer> values = new HashSet<Integer>();
+    while (itr.hasNext()) {
+      values.add((int) itr.next().get());
+    }
+    return values;
+  }
+
+  private void verify(Path output) throws Exception {
+    Configuration conf = new Configuration();
+    Path outputFile = new Path(output, "part-r-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, outputFile, conf);
+    Writable clusterId = new Text();
+    VectorWritable point = new VectorWritable();
+    List<Vector> clusteredItems = new ArrayList<Vector>();
+    String prevClusterId = "";
+    while (reader.next(clusterId, point)) {
+      if (prevClusterId.equals(clusterId.toString())) {
+        clusteredItems.add(point.get().clone());
+      } else {
+        if (clusteredItems.size() > 1) {
+          // run pair-wise similarity test on items in a cluster
+          for (int i = 0; i < clusteredItems.size(); i++) {
+            Set<Integer> itemSet1 = getValues(clusteredItems.get(i));
+            for (int j = i + 1; j < clusteredItems.size(); j++) {
+              Set<Integer> itemSet2 = getValues(clusteredItems.get(j));
+              Set<Integer> union = new HashSet<Integer>();
+              union.addAll(itemSet1);
+              union.addAll(itemSet2);
+              Collection<Integer> intersect = new HashSet<Integer>();
+              intersect.addAll(itemSet1);
+              intersect.retainAll(itemSet2);
+              double similarity = intersect.size() / (double) union.size();
+              assertTrue("Sets failed min similarity test, Set1: "
+                  + itemSet1 + " Set2: " + itemSet2 + ", similarity:" + similarity, similarity > 0.4);
+            }
+          }
+        }
+        clusteredItems.clear();
+        prevClusterId = clusterId.toString();
+      }
+    }
+  }
+
+  @Test
+  public void testLinearMinHashMRJob() throws Exception {
+    String[] args = makeArguments(2, 3, 20, 4, HashType.LINEAR.toString());
+    int ret = ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+    assertEquals("Minhash MR Job failed for " + HashType.LINEAR.toString(), 0, ret);
+    System.out.println("Verifying linear hash results");
+    verify(output);
+  }
+
+  @Test
+  public void testPolynomialMinHashMRJob() throws Exception {
+    String[] args = makeArguments(2, 3, 20, 4, HashType.POLYNOMIAL.toString());
+    int ret = ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+    assertEquals("Minhash MR Job failed for " + HashType.POLYNOMIAL.toString(), 0, ret);
+    System.out.println("Verifying linear hash results");
+    verify(output);
+  }
+
+  @Test
+  public void testMurmurMinHashMRJob() throws Exception {
+    String[] args = makeArguments(2, 3, 20, 4, HashType.MURMUR.toString());
+    int ret = ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+    assertEquals("Minhash MR Job failed for " + HashType.MURMUR.toString(), 0, ret);
+    System.out.println("verifying murmur hash results");
+    verify(output);
+  }
+
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class LastfmClusterEvaluator {
+
+  private LastfmClusterEvaluator() {
+  }
+
+  /* Calculate used JVM memory */
+  private static String usedMemory() {
+    Runtime runtime = Runtime.getRuntime();
+    return "Used Memory: [" + ((runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)) + " MB] ";
+  }
+
+  /**
+   * Computer Jaccard coefficient over two sets. (A intersect B) / (A union B)
+   */
+  private static double computeSimilarity(Iterable<Integer> listenerVector1, Iterable<Integer> listenerVector2) {
+    Set<Integer> first = new HashSet<Integer>();
+    for (Integer ele : listenerVector1) {
+      first.add(ele);
+    }
+    Set<Integer> second = new HashSet<Integer>();
+    for (Integer ele : listenerVector2) {
+      second.add(ele);
+    }
+
+    Set<Integer> intersection = new HashSet<Integer>(first);
+    intersection.retainAll(second);
+    double intersectSize = intersection.size();
+
+    first.addAll(second);
+    double unionSize = first.size();
+    return (unionSize == 0) ? 0.0 : (intersectSize / unionSize);
+  }
+
+  /**
+   * Calculate the overall cluster precision by sampling clusters. Precision is
+   * calculated as follows :-
+   * 
+   * 1. For a sample of all the clusters calculate the pair-wise similarity
+   * (Jaccard coefficient) for items in the same cluster.
+   * 
+   * 2. Count true positives as items whose similarity is above specified
+   * threshold.
+   * 
+   * 3. Precision = (true positives) / (total items in clusters sampled).
+   * 
+   * @param clusterFile
+   *          The file containing cluster information
+   * @param threshold
+   *          Similarity threshold for containing two items in a cluster to be
+   *          relevant. Must be between 0.0 and 1.0
+   * @param samplePercentage
+   *          Percentage of clusters to sample. Must be between 0.0 and 1.0
+   */
+  private static void testPrecision(Path clusterFile, double threshold, double samplePercentage) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, clusterFile, conf);
+    Random rand = new Random(11);
+    Writable cluster = new Text();
+    Text prevCluster = new Text();
+    VectorWritable point = new VectorWritable();
+    List<List<Integer>> listenerVectors = new ArrayList<List<Integer>>();
+    long similarListeners = 0;
+    long allListeners = 0;
+    int clustersProcessed = 0;
+    while (reader.next(cluster, point)) {
+      if (!cluster.equals(prevCluster)) {
+        // We got a new cluster
+        prevCluster.set(cluster.toString());
+        // Should we check previous cluster ?
+        if (rand.nextDouble() > samplePercentage) {
+          listenerVectors.clear();
+          continue;
+        }
+        int numListeners = listenerVectors.size();
+        allListeners += numListeners;
+        for (int i = 0; i < numListeners; i++) {
+          List<Integer> listenerVector1 = listenerVectors.get(i);
+          for (int j = i + 1; j < numListeners; j++) {
+            List<Integer> listenerVector2 = listenerVectors.get(j);
+            double similarity = computeSimilarity(listenerVector1,
+                listenerVector2);
+            similarListeners += (similarity >= threshold) ? 1 : 0;
+          }
+        }
+        listenerVectors.clear();
+        clustersProcessed++;
+        System.out.print('\r' + usedMemory() + " Clusters processed: " + clustersProcessed);
+      }
+      List<Integer> listeners = new ArrayList<Integer>();
+      for (Vector.Element ele : point.get()) {
+        listeners.add((int) ele.get());
+      }
+      listenerVectors.add(listeners);
+    }
+    System.out.println("\nTest Results");
+    System.out.println("=============");
+    System.out.println(" (A) Listeners in same cluster with simiarity above threshold ("
+ + threshold + ") : " + similarListeners);
+    System.out.println(" (B) All listeners: " + allListeners);
+    NumberFormat format = NumberFormat.getInstance();
+    format.setMaximumFractionDigits(2);
+    double precision = ((double) similarListeners / allListeners) * 100.0;
+    System.out.println(" Average cluster precision: A/B = " + format.format(precision));
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 3) {
+      System.out.println("LastfmClusterEvaluation <cluster-file> <threshold> <sample-percentage>");
+      System.out.println("      <cluster-file>: Absolute Path of file containing cluster information in DEBUG format");
+      System.out.println("         <threshold>: Minimum threshold for jaccard co-efficient for considering two items");
+      System.out.println("                      in a cluster to be really similar. Should be between 0.0 and 1.0");
+      System.out.println(" <sample-percentage>: Percentage of clusters to sample. Should be between 0.0 and 1.0");
+      return;
+    }
+    Path clusterFile = new Path(args[0]);
+    double threshold = Double.parseDouble(args[1]);
+    double samplePercentage = Double.parseDouble(args[2]);
+    testPrecision(clusterFile, threshold, samplePercentage);
+  }
+
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java Fri Oct  1 07:40:53 2010
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public final class LastfmDataConverter {
+
+  private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+  // we are clustering similar featureIdxs on the following dataset
+  // http://www.iua.upf.es/~ocelma/MusicRecommendationDataset/index.html
+  //
+  // Preparation of the data set means gettting the dataset to a format which
+  // can
+  // be read by the min hash algorithm;
+  //
+  enum Lastfm {
+    USERS_360K(17559530),
+    USERS_1K(19150868);
+    private final int totalRecords;
+    Lastfm(int totalRecords) {
+      this.totalRecords = totalRecords;
+    }
+  }
+
+  private LastfmDataConverter() {
+  }
+
+  private static String usedMemory() {
+    Runtime runtime = Runtime.getRuntime();
+    return "Used Memory: [" + ((runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)) + " MB] ";
+  }
+
+  /* Get the feature from the parsed record */
+  private static String getFeature(String[] fields, Lastfm dataSet) {
+    if (dataSet == Lastfm.USERS_360K) {
+      return fields[0];
+    } else {
+      return fields[2];
+    }
+  }
+
+  /* Get the item from the parsed record */
+  private static String getItem(String[] fields, Lastfm dataSet) {
+    if (dataSet == Lastfm.USERS_360K) {
+      return fields[2];
+    } else {
+      return fields[0];
+    }
+  }
+
+  /**
+   * Reads the LastFm dataset and constructs a Map of (item, features). For 360K
+   * Users dataset - (Item=Artist, Feature=User) For 1K Users dataset -
+   * (Item=User, Feature=Artist)
+   * 
+   * @param inputFile
+   *          Lastfm dataset file on the local file system.
+   * @param dataSet
+   *          Type of dataset - 360K Users or 1K Users
+   * @return
+   */
+  public static Map<String, List<Integer>> convertToItemFeatures(String inputFile, Lastfm dataSet) throws IOException {
+    long totalRecords = dataSet.totalRecords;
+    Map<String, Integer> featureIdxMap = new HashMap<String, Integer>();
+    Map<String, List<Integer>> itemFeaturesMap = new HashMap<String, List<Integer>>();
+    String msg = usedMemory() + "Converting data to internal vector format: ";
+    BufferedReader br = new BufferedReader(new FileReader(inputFile));
+    try {
+      System.out.print(msg);
+      int prevPercentDone = 1;
+      double percentDone = 0.0;
+      long parsedRecords = 0;
+      String line;
+      while ((line = br.readLine()) != null) {
+        String[] fields = TAB_PATTERN.split(line);
+        String feature = getFeature(fields, dataSet);
+        String item = getItem(fields, dataSet);
+        // get the featureIdx
+        Integer featureIdx = featureIdxMap.get(feature);
+        if (featureIdx == null) {
+          featureIdx = featureIdxMap.size() + 1;
+          featureIdxMap.put(feature, featureIdx);
+        }
+        // add it to the corresponding feature idx map
+        List<Integer> features = itemFeaturesMap.get(item);
+        if (features == null) {
+          features = new ArrayList<Integer>();
+          itemFeaturesMap.put(item, features);
+        }
+        features.add(featureIdx);
+        parsedRecords++;
+        // Update the progress
+        percentDone = (parsedRecords * 100) / totalRecords;
+        msg = usedMemory() + "Converting data to internal vector format: ";
+        if (percentDone > prevPercentDone) {
+          System.out.print('\r' + msg + percentDone + '%');
+          prevPercentDone++;
+        }
+        parsedRecords++;
+      }
+      msg = usedMemory() + "Converting data to internal vector format: ";
+      System.out.print('\r' + msg + percentDone + "% Completed\n");
+    } finally {
+      br.close();
+    }
+    return itemFeaturesMap;
+  }
+
+  /**
+   * Converts each record in (item,features) map into Mahout vector format and
+   * writes it into sequencefile for minhash clustering
+   */
+  public static boolean writeToSequenceFile(Map<String, List<Integer>> itemFeaturesMap, Path outputPath)
+    throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    fs.mkdirs(outputPath.getParent());
+    long totalRecords = itemFeaturesMap.size();
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, outputPath, Text.class, VectorWritable.class);
+    try {
+      String msg = "Now writing vectorized data in sequence file format: ";
+      System.out.print(msg);
+
+      Text itemWritable = new Text();
+      VectorWritable featuresWritable = new VectorWritable();
+
+      int doneRecords = 0;
+      int prevPercentDone = 1;
+
+      for (Map.Entry<String, List<Integer>> itemFeature : itemFeaturesMap.entrySet()) {
+        int numfeatures = itemFeature.getValue().size();
+        itemWritable.set(itemFeature.getKey());
+        Vector featureVector = new SequentialAccessSparseVector(numfeatures);
+        int i = 0;
+        for (Integer feature : itemFeature.getValue()) {
+          featureVector.setQuick(i++, feature);
+        }
+        featuresWritable.set(featureVector);
+        writer.append(itemWritable, featuresWritable);
+        // Update the progress
+        double percentDone = ((++doneRecords) * 100) / totalRecords;
+        if (percentDone > prevPercentDone) {
+          System.out.print('\r' + msg + percentDone + "% " + ((percentDone >= 100) ? "Completed\n" : ""));
+          prevPercentDone++;
+        }
+      }
+    } finally {
+      writer.close();
+    }
+    return true;
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.out.println("[Usage]: LastfmDataConverter <input> <output> <dataset>");
+      System.out.println("   <input>: Absolute path to the local file [usersha1-artmbid-artname-plays.tsv] ");
+      System.out.println("  <output>: Absolute path to the HDFS output file");
+      System.out.println(" <dataset>: Either of the two Lastfm public datasets. "
+          + "Must be either 'Users360K' or 'Users1K'");
+      System.out.println("Note:- Hadoop configuration pointing to HDFS namenode should be in classpath");
+      return;
+    }
+    Lastfm dataSet = Lastfm.valueOf(args[2]);
+    Map<String, List<Integer>> itemFeatures = convertToItemFeatures(args[0],
+        dataSet);
+    if (itemFeatures.isEmpty()) {
+      throw new IllegalStateException("Error converting the data file: [" + args[0] + ']');
+    }
+    Path output = new Path(args[1]);
+    boolean status = writeToSequenceFile(itemFeatures, output);
+    if (status) {
+      System.out.println("Data converted and written successfully to HDFS location: [" + output + ']');
+    } else {
+      System.err.println("Error writing the converted data to HDFS location: [" + output + ']');
+    }
+  }
+}