You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/09 22:20:44 UTC

svn commit: r1491279 - in /mahout/trunk: ./ core/src/main/java/org/apache/mahout/clustering/minhash/ core/src/test/java/org/apache/mahout/clustering/minhash/

Author: ssc
Date: Sun Jun  9 20:20:43 2013
New Revision: 1491279

URL: http://svn.apache.org/r1491279
Log:
MAHOUT-1251 Optimize MinHashMapper

Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinhashOptionCreator.java
Modified:
    mahout/trunk/CHANGELOG
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java

Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1491279&r1=1491278&r2=1491279&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Sun Jun  9 20:20:43 2013
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.8 - unreleased
 
+  MAHOUT-1251: Optimize MinHashMapper (ssc)
+
   MAHOUT-1211: Disabled swallowing of IOExceptions is Closeables.close for writers (dfilimon)
 
   MAHOUT-1164: Make ARFF integration generate meta-data in JSON format (Marty Kube via ssc)

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java?rev=1491279&r1=1491278&r2=1491279&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java Sun Jun  9 20:20:43 2013
@@ -33,6 +33,17 @@ import org.apache.mahout.math.VectorWrit
 
 public final class MinHashDriver extends AbstractJob {
 
+  public static final String NUM_HASH_FUNCTIONS = "numHashFunctions";
+  public static final String KEY_GROUPS = "keyGroups";
+  public static final String HASH_TYPE = "hashType";
+  public static final String MIN_CLUSTER_SIZE = "minClusterSize";
+  public static final String MIN_VECTOR_SIZE = "minVectorSize";
+  public static final String NUM_REDUCERS = "numReducers";
+  public static final String DEBUG_OUTPUT = "debugOutput";
+  public static final String VECTOR_DIMENSION_TO_HASH  = "vectorDimensionToHash";
+
+  static final String HASH_DIMENSION_VALUE = "value";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new MinHashDriver(), args);
   }
@@ -41,14 +52,18 @@ public final class MinHashDriver extends
   public int run(String[] args) throws Exception {
     addInputOption();
     addOutputOption();
-    addOption(MinhashOptionCreator.minClusterSizeOption().create());
-    addOption(MinhashOptionCreator.minVectorSizeOption().create());
-    addOption(MinhashOptionCreator.vectorDimensionToHashOption().create());
-    addOption(MinhashOptionCreator.hashTypeOption().create());
-    addOption(MinhashOptionCreator.numHashFunctionsOption().create());
-    addOption(MinhashOptionCreator.keyGroupsOption().create());
-    addOption(MinhashOptionCreator.numReducersOption().create());
-    addOption(MinhashOptionCreator.debugOutputOption().create());
+
+
+    addOption(MIN_CLUSTER_SIZE, "mcs", "Minimum points inside a cluster", String.valueOf(10));
+    addOption(MIN_VECTOR_SIZE, "mvs", "Minimum size of vector to be hashed", String.valueOf(5));
+    addOption(VECTOR_DIMENSION_TO_HASH, "vdh", "Dimension of vector to hash. Available types: (value, index). " +
+        "Defaults to 'value'", HASH_DIMENSION_VALUE);
+    addOption(HASH_TYPE, "ht", "Type of hash function to use. Available types: (linear, polynomial, murmur) ",
+        HashFactory.HashType.MURMUR.toString());
+    addOption(NUM_HASH_FUNCTIONS, "nh", "Number of hash functions to be used", String.valueOf(10));
+    addOption(KEY_GROUPS, "kg", "Number of key groups to be used", String.valueOf(2));
+    addOption(NUM_REDUCERS, "nr", "The number of reduce tasks. Defaults to 2", String.valueOf(2));
+    addFlag(DEBUG_OUTPUT, "debug", "Output the whole vectors for debugging");
     addOption(DefaultOptionCreator.overwriteOption().create());
 
     if (parseArguments(args) == null) {
@@ -59,14 +74,21 @@ public final class MinHashDriver extends
       HadoopUtil.delete(getConf(), getOutputPath());
     }
 
-    int minClusterSize = Integer.valueOf(getOption(MinhashOptionCreator.MIN_CLUSTER_SIZE));
-    int minVectorSize = Integer.valueOf(getOption(MinhashOptionCreator.MIN_VECTOR_SIZE));
-    String dimensionToHash = getOption(MinhashOptionCreator.VECTOR_DIMENSION_TO_HASH);
-    String hashType = getOption(MinhashOptionCreator.HASH_TYPE);
-    int numHashFunctions = Integer.valueOf(getOption(MinhashOptionCreator.NUM_HASH_FUNCTIONS));
-    int keyGroups = Integer.valueOf(getOption(MinhashOptionCreator.KEY_GROUPS));
-    int numReduceTasks = Integer.parseInt(getOption(MinhashOptionCreator.NUM_REDUCERS));
-    boolean debugOutput = hasOption(MinhashOptionCreator.DEBUG_OUTPUT);
+    int minClusterSize = Integer.valueOf(getOption(MIN_CLUSTER_SIZE));
+    int minVectorSize = Integer.valueOf(getOption(MIN_VECTOR_SIZE));
+    String dimensionToHash = getOption(VECTOR_DIMENSION_TO_HASH);
+    String hashType = getOption(HASH_TYPE);
+    int numHashFunctions = Integer.valueOf(getOption(NUM_HASH_FUNCTIONS));
+    int keyGroups = Integer.valueOf(getOption(KEY_GROUPS));
+    int numReduceTasks = Integer.parseInt(getOption(NUM_REDUCERS));
+    boolean debugOutput = hasOption(DEBUG_OUTPUT);
+
+    try {
+      HashFactory.HashType.valueOf(hashType);
+    } catch (IllegalArgumentException e) {
+      System.err.println("Unknown hashType: " + hashType);
+      return -1;
+    }
 
     Class<? extends Writable> outputClass = debugOutput ? VectorWritable.class : Text.class;
     Class<? extends OutputFormat> outputFormatClass =
@@ -76,13 +98,13 @@ public final class MinHashDriver extends
             Text.class, outputClass, MinHashReducer.class, Text.class, VectorWritable.class, outputFormatClass);
 
     Configuration minHashConfiguration = minHash.getConfiguration();
-    minHashConfiguration.setInt(MinhashOptionCreator.MIN_CLUSTER_SIZE, minClusterSize);
-    minHashConfiguration.setInt(MinhashOptionCreator.MIN_VECTOR_SIZE, minVectorSize);
-    minHashConfiguration.set(MinhashOptionCreator.VECTOR_DIMENSION_TO_HASH, dimensionToHash);
-    minHashConfiguration.set(MinhashOptionCreator.HASH_TYPE, hashType);
-    minHashConfiguration.setInt(MinhashOptionCreator.NUM_HASH_FUNCTIONS, numHashFunctions);
-    minHashConfiguration.setInt(MinhashOptionCreator.KEY_GROUPS, keyGroups);
-    minHashConfiguration.setBoolean(MinhashOptionCreator.DEBUG_OUTPUT, debugOutput);
+    minHashConfiguration.setInt(MIN_CLUSTER_SIZE, minClusterSize);
+    minHashConfiguration.setInt(MIN_VECTOR_SIZE, minVectorSize);
+    minHashConfiguration.set(VECTOR_DIMENSION_TO_HASH, dimensionToHash);
+    minHashConfiguration.set(HASH_TYPE, hashType);
+    minHashConfiguration.setInt(NUM_HASH_FUNCTIONS, numHashFunctions);
+    minHashConfiguration.setInt(KEY_GROUPS, keyGroups);
+    minHashConfiguration.setBoolean(DEBUG_OUTPUT, debugOutput);
     minHash.setNumReduceTasks(numReduceTasks);
 
     boolean succeeded = minHash.waitForCompletion(true);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java?rev=1491279&r1=1491278&r2=1491279&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java Sun Jun  9 20:20:43 2013
@@ -24,15 +24,11 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.mahout.clustering.minhash.HashFactory.HashType;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
 public class MinHashMapper extends Mapper<Text, VectorWritable, Text, Writable> {
 
-  private static final Logger log = LoggerFactory.getLogger(MinHashMapper.class);
-
   private HashFunction[] hashFunction;
   private int numHashFunctions;
   private int keyGroups;
@@ -40,28 +36,26 @@ public class MinHashMapper extends Mappe
   private boolean debugOutput;
   private int[] minHashValues;
   private byte[] bytesToHash;
-  private String dimensionToHash;
+  private boolean hashValue;
+
+  private Text cluster = new Text();
+  private VectorWritable vector = new VectorWritable();
 
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration conf = context.getConfiguration();
-    this.dimensionToHash = conf.get(MinhashOptionCreator.VECTOR_DIMENSION_TO_HASH, "value");
-    this.numHashFunctions = conf.getInt(MinhashOptionCreator.NUM_HASH_FUNCTIONS, 10);
-    this.minHashValues = new int[numHashFunctions];
-    this.bytesToHash = new byte[4];
-    this.keyGroups = conf.getInt(MinhashOptionCreator.KEY_GROUPS, 1);
-    this.minVectorSize = conf.getInt(MinhashOptionCreator.MIN_VECTOR_SIZE, 5);
-    String htype = conf.get(MinhashOptionCreator.HASH_TYPE, "linear");
-    this.debugOutput = conf.getBoolean(MinhashOptionCreator.DEBUG_OUTPUT, false);
-
-    HashType hashType;
-    try {
-      hashType = HashType.valueOf(htype);
-    } catch (IllegalArgumentException iae) {
-      log.warn("No valid hash type found in configuration for {}, assuming type: {}", htype, HashType.LINEAR);
-      hashType = HashType.LINEAR;
-    }
+    numHashFunctions = conf.getInt(MinHashDriver.NUM_HASH_FUNCTIONS, 10);
+    minHashValues = new int[numHashFunctions];
+    bytesToHash = new byte[4];
+    keyGroups = conf.getInt(MinHashDriver.KEY_GROUPS, 1);
+    minVectorSize = conf.getInt(MinHashDriver.MIN_VECTOR_SIZE, 5);
+    debugOutput = conf.getBoolean(MinHashDriver.DEBUG_OUTPUT, false);
+
+    String dimensionToHash = conf.get(MinHashDriver.VECTOR_DIMENSION_TO_HASH);
+    hashValue = MinHashDriver.HASH_DIMENSION_VALUE.equalsIgnoreCase(dimensionToHash);
+
+    HashType hashType = HashType.valueOf(conf.get(MinHashDriver.HASH_TYPE));
     hashFunction = HashFactory.createHashFunctions(hashType, numHashFunctions);
   }
 
@@ -85,7 +79,7 @@ public class MinHashMapper extends Mappe
 
     for (int i = 0; i < numHashFunctions; i++) {
       for (Vector.Element ele : featureVector.nonZeroes()) {
-        int value = "value".equalsIgnoreCase(dimensionToHash) ? (int) ele.get() : ele.index();
+        int value = hashValue ? (int) ele.get() : ele.index();
         bytesToHash[0] = (byte) (value >> 24);
         bytesToHash[1] = (byte) (value >> 16);
         bytesToHash[2] = (byte) (value >> 8);
@@ -105,14 +99,15 @@ public class MinHashMapper extends Mappe
       }
       //remove the last dash
       clusterIdBuilder.deleteCharAt(clusterIdBuilder.length() - 1);
-      Text cluster = new Text(clusterIdBuilder.toString());
-      Writable point;
+
+      cluster.set(clusterIdBuilder.toString());
+
       if (debugOutput) {
-        point = new VectorWritable(featureVector.clone());
+        vector.set(featureVector);
+        context.write(cluster, vector);
       } else {
-        point = new Text(item.toString());
+        context.write(cluster, item);
       }
-      context.write(cluster, point);
     }
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java?rev=1491279&r1=1491278&r2=1491279&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java Sun Jun  9 20:20:43 2013
@@ -42,8 +42,8 @@ public class MinHashReducer extends Redu
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration conf = context.getConfiguration();
-    this.minClusterSize = conf.getInt(MinhashOptionCreator.MIN_CLUSTER_SIZE, 5);
-    this.debugOutput = conf.getBoolean(MinhashOptionCreator.DEBUG_OUTPUT, false);
+    minClusterSize = conf.getInt(MinHashDriver.MIN_CLUSTER_SIZE, 5);
+    debugOutput = conf.getBoolean(MinHashDriver.DEBUG_OUTPUT, false);
   }
   
   /**

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java?rev=1491279&r1=1491278&r2=1491279&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java Sun Jun  9 20:20:43 2013
@@ -70,8 +70,9 @@ public final class TestMinHashClustering
     output = new Path(getTestTempDirPath(), "output");
     Path pointFile = new Path(input, "file1");
     FileSystem fs = FileSystem.get(pointFile.toUri(), conf);
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, pointFile, Text.class, VectorWritable.class);
+    SequenceFile.Writer writer = null;
     try {
+      writer = new SequenceFile.Writer(fs, conf, pointFile, Text.class, VectorWritable.class);
       int id = 0;
       for (VectorWritable point : points) {
         writer.append(new Text("Id-" + id++), point);
@@ -88,14 +89,14 @@ public final class TestMinHashClustering
                                  String hashType) {
     return new String[] {optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(),
                          optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
-                         optKey(MinhashOptionCreator.VECTOR_DIMENSION_TO_HASH), dimensionToHash,
-                         optKey(MinhashOptionCreator.MIN_CLUSTER_SIZE), String.valueOf(minClusterSize),
-                         optKey(MinhashOptionCreator.MIN_VECTOR_SIZE), String.valueOf(minVectorSize),
-                         optKey(MinhashOptionCreator.HASH_TYPE), hashType,
-                         optKey(MinhashOptionCreator.NUM_HASH_FUNCTIONS), String.valueOf(numHashFunctions),
-                         optKey(MinhashOptionCreator.KEY_GROUPS), String.valueOf(keyGroups),
-                         optKey(MinhashOptionCreator.NUM_REDUCERS), "1",
-                         optKey(MinhashOptionCreator.DEBUG_OUTPUT)};
+                         optKey(MinHashDriver.VECTOR_DIMENSION_TO_HASH), dimensionToHash,
+                         optKey(MinHashDriver.MIN_CLUSTER_SIZE), String.valueOf(minClusterSize),
+                         optKey(MinHashDriver.MIN_VECTOR_SIZE), String.valueOf(minVectorSize),
+                         optKey(MinHashDriver.HASH_TYPE), hashType,
+                         optKey(MinHashDriver.NUM_HASH_FUNCTIONS), String.valueOf(numHashFunctions),
+                         optKey(MinHashDriver.KEY_GROUPS), String.valueOf(keyGroups),
+                         optKey(MinHashDriver.NUM_REDUCERS), String.valueOf(1),
+                         optKey(MinHashDriver.DEBUG_OUTPUT)};
   }
   
   private static Set<Integer> getValues(Vector vector, String dimensionToHash) {
@@ -152,7 +153,15 @@ public final class TestMinHashClustering
     }
     runPairwiseSimilarity(clusteredItems, simThreshold, dimensionToHash, msg);
   }
-  
+
+
+  @Test
+  public void testFailOnNonExistingHashType() throws Exception {
+    String[] args = makeArguments("value", 2, 3, 20, 4, "xKrot37");
+    int ret = ToolRunner.run(getConfiguration(), new MinHashDriver(), args);
+    assertEquals(-1, ret);
+  }
+
   @Test
   public void testLinearMinHashMRJob() throws Exception {
     String[] args = makeArguments("value", 2, 3, 20, 4, HashType.LINEAR.toString());