You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/10/01 09:40:53 UTC
svn commit: r1003415 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/clustering/minhash/
core/src/main/java/org/apache/mahout/common/commandline/
core/src/test/java/org/apache/mahout/clustering/minhash/
examples/src/main/java/org/apache/mahout...
Author: srowen
Date: Fri Oct 1 07:40:53 2010
New Revision: 1003415
URL: http://svn.apache.org/viewvc?rev=1003415&view=rev
Log:
MAHOUT-344
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFactory.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.vectors.MurmurHash;
+
+import java.util.Random;
+
+public class HashFactory {
+
+ private HashFactory() {
+ }
+
+ public enum HashType {
+ LINEAR, POLYNOMIAL, MURMUR
+ }
+
+ public static HashFunction[] createHashFunctions(HashType type, int numFunctions) {
+ HashFunction[] hashFunction = new HashFunction[numFunctions];
+ Random seed = new Random(11);
+ switch (type) {
+ case LINEAR:
+ for (int i = 0; i < numFunctions; i++) {
+ hashFunction[i] = new LinearHash(seed.nextInt(), seed.nextInt());
+ }
+ break;
+ case POLYNOMIAL:
+ for (int i = 0; i < numFunctions; i++) {
+ hashFunction[i] = new PolynomialHash(seed.nextInt(), seed.nextInt(), seed.nextInt());
+ }
+ break;
+ case MURMUR:
+ for (int i = 0; i < numFunctions; i++) {
+ hashFunction[i] = new MurmurHashWrapper(seed.nextInt());
+ }
+ break;
+ }
+ return hashFunction;
+ }
+
+ static class LinearHash implements HashFunction {
+ private final int seedA;
+ private final int seedB;
+
+ LinearHash(int seedA, int seedB) {
+ this.seedA = seedA;
+ this.seedB = seedB;
+ }
+
+ @Override
+ public int hash(byte[] bytes) {
+ long hashValue = 31;
+ for (byte byteVal : bytes) {
+ hashValue *= seedA * byteVal;
+ hashValue += seedB;
+ }
+ return Math.abs((int) (hashValue % RandomUtils.MAX_INT_SMALLER_TWIN_PRIME));
+ }
+ }
+
+ static class PolynomialHash implements HashFunction {
+ private final int seedA;
+ private final int seedB;
+ private final int seedC;
+
+ PolynomialHash(int seedA, int seedB, int seedC) {
+ this.seedA = seedA;
+ this.seedB = seedB;
+ this.seedC = seedC;
+ }
+
+ @Override
+ public int hash(byte[] bytes) {
+ long hashValue = 31;
+ for (byte byteVal : bytes) {
+ hashValue *= seedA * (byteVal >> 4);
+ hashValue += seedB * byteVal + seedC;
+ }
+ return Math
+ .abs((int) (hashValue % RandomUtils.MAX_INT_SMALLER_TWIN_PRIME));
+ }
+ }
+
+ static class MurmurHashWrapper implements HashFunction {
+ private final int seed;
+
+ MurmurHashWrapper(int seed) {
+ this.seed = seed;
+ }
+
+ @Override
+ public int hash(byte[] bytes) {
+ long hashValue = MurmurHash.hash64A(bytes, seed);
+ return Math.abs((int) (hashValue % RandomUtils.MAX_INT_SMALLER_TWIN_PRIME));
+ }
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/HashFunction.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.minhash;
+
+public interface HashFunction {
+
+ int hash(byte[] bytes);
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashDriver.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public final class MinHashDriver extends AbstractJob {
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+ }
+
+ private void runJob(Path input,
+ Path output,
+ int minClusterSize,
+ int minVectorSize,
+ String hashType,
+ int numHashFunctions,
+ int keyGroups,
+ int numReduceTasks,
+ boolean debugOutput) throws IOException, ClassNotFoundException, InterruptedException {
+ Configuration conf = getConf();
+
+ conf.setInt(MinhashOptionCreator.MIN_CLUSTER_SIZE, minClusterSize);
+ conf.setInt(MinhashOptionCreator.MIN_VECTOR_SIZE, minVectorSize);
+ conf.set(MinhashOptionCreator.HASH_TYPE, hashType);
+ conf.setInt(MinhashOptionCreator.NUM_HASH_FUNCTIONS, numHashFunctions);
+ conf.setInt(MinhashOptionCreator.KEY_GROUPS, keyGroups);
+ conf.setBoolean(MinhashOptionCreator.DEBUG_OUTPUT, debugOutput);
+
+ Class<? extends Writable> outputClass =
+ debugOutput ? VectorWritable.class : Text.class;
+ Class<? extends OutputFormat> outputFormatClass =
+ debugOutput ? SequenceFileOutputFormat.class : TextOutputFormat.class;
+
+ Job job = new Job(conf, "MinHash Clustering");
+ job.setJarByClass(MinHashDriver.class);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(MinHashMapper.class);
+ job.setReducerClass(MinHashReducer.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(outputFormatClass);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(outputClass);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(outputClass);
+
+ job.setNumReduceTasks(numReduceTasks);
+
+ job.waitForCompletion(true);
+ }
+
+ @Override
+ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
+ addInputOption();
+ addOutputOption();
+ addOption(MinhashOptionCreator.minClusterSizeOption().create());
+ addOption(MinhashOptionCreator.minVectorSizeOption().create());
+ addOption(MinhashOptionCreator.hashTypeOption().create());
+ addOption(MinhashOptionCreator.numHashFunctionsOption().create());
+ addOption(MinhashOptionCreator.keyGroupsOption().create());
+ addOption(MinhashOptionCreator.numReducersOption().create());
+ addOption(MinhashOptionCreator.debugOutputOption().create());
+
+ if (parseArguments(args) == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ int minClusterSize = Integer.valueOf(getOption(MinhashOptionCreator.MIN_CLUSTER_SIZE));
+ int minVectorSize = Integer.valueOf(getOption(MinhashOptionCreator.MIN_VECTOR_SIZE));
+ String hashType = getOption(MinhashOptionCreator.HASH_TYPE);
+ int numHashFunctions = Integer.valueOf(getOption(MinhashOptionCreator.NUM_HASH_FUNCTIONS));
+ int keyGroups = Integer.valueOf(getOption(MinhashOptionCreator.KEY_GROUPS));
+ int numReduceTasks = Integer.parseInt(getOption(MinhashOptionCreator.NUM_REDUCERS));
+ boolean debugOutput = Boolean.parseBoolean(getOption(MinhashOptionCreator.DEBUG_OUTPUT));
+
+ runJob(input,
+ output,
+ minClusterSize,
+ minVectorSize,
+ hashType,
+ numHashFunctions,
+ keyGroups,
+ numReduceTasks,
+ debugOutput);
+ return 0;
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashMapper.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.minhash.HashFactory.HashType;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class MinHashMapper extends Mapper<Text, Writable, Text, Writable> {
+
+ private static final Logger log = LoggerFactory.getLogger(MinHashMapper.class);
+
+ private HashFunction[] hashFunction;
+ private int numHashFunctions;
+ private int keyGroups;
+ private int minVectorSize;
+ private boolean debugOutput;
+ private int[] minHashValues;
+ private byte[] bytesToHash;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.numHashFunctions = conf.getInt(MinhashOptionCreator.NUM_HASH_FUNCTIONS, 10);
+ this.minHashValues = new int[numHashFunctions];
+ this.bytesToHash = new byte[4];
+ this.keyGroups = conf.getInt(MinhashOptionCreator.KEY_GROUPS, 1);
+ this.minVectorSize = conf.getInt(MinhashOptionCreator.MIN_VECTOR_SIZE, 5);
+ String htype = conf.get(MinhashOptionCreator.HASH_TYPE, "linear");
+ this.debugOutput = conf.getBoolean(MinhashOptionCreator.DEBUG_OUTPUT, false);
+
+ HashType hashType;
+ try {
+ hashType = HashType.valueOf(htype);
+ } catch (IllegalArgumentException iae) {
+ log.warn("No valid hash type found in configuration for {}, assuming type: {}", htype, HashType.LINEAR);
+ hashType = HashType.LINEAR;
+ }
+ hashFunction = HashFactory.createHashFunctions(hashType, numHashFunctions);
+ }
+
+ /**
+ * Hash all items with each function and retain min. value for each iteration.
+ * We up with X number of minhash signatures.
+ *
+ * Now depending upon the number of key-groups (1 - 4) concatenate that many
+ * minhash values to form cluster-id as 'key' and item-id as 'value'
+ */
+ @Override
+ public void map(Text item, Writable features, Context context) throws IOException, InterruptedException {
+ Vector featureVector = ((VectorWritable) features).get();
+ if (featureVector.size() < minVectorSize) {
+ return;
+ }
+ // Initialize the minhash values to highest
+ for (int i = 0; i < numHashFunctions; i++) {
+ minHashValues[i] = Integer.MAX_VALUE;
+ }
+ int hashIndex = 0;
+ for (int i = 0; i < numHashFunctions; i++) {
+ for (Vector.Element ele : featureVector) {
+ int value = (int) ele.get();
+ bytesToHash[0] = (byte) (value >> 24);
+ bytesToHash[1] = (byte) (value >> 16);
+ bytesToHash[2] = (byte) (value >> 8);
+ bytesToHash[3] = (byte) (value);
+ hashIndex = hashFunction[i].hash(bytesToHash);
+ if (minHashValues[i] > hashIndex) {
+ minHashValues[i] = hashIndex;
+ }
+ }
+ }
+ // output the cluster information
+ for (int i = 0; i < numHashFunctions; i += keyGroups) {
+ StringBuilder clusterIdBuilder = new StringBuilder();
+ for (int j = 0; j < keyGroups && (i + j) < numHashFunctions; j++) {
+ clusterIdBuilder.append(minHashValues[i + j]).append('-');
+ }
+ String clusterId = clusterIdBuilder.toString();
+ clusterId = clusterId.substring(0, clusterId.lastIndexOf('-'));
+ Text cluster = new Text(clusterId);
+ Writable point;
+ if (debugOutput) {
+ point = new VectorWritable(featureVector.clone());
+ } else {
+ point = new Text(item.toString());
+ }
+ context.write(cluster, point);
+ }
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/minhash/MinHashReducer.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class MinHashReducer extends Reducer<Text, Writable, Text, Writable> {
+
+ private int minClusterSize;
+ private boolean debugOutput;
+
+ enum Clusters {
+ Accepted, Discarded
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.minClusterSize = conf.getInt(MinhashOptionCreator.MIN_CLUSTER_SIZE, 5);
+ this.debugOutput = conf.getBoolean(MinhashOptionCreator.DEBUG_OUTPUT, false);
+ }
+
+ /**
+ * output the items clustered
+ */
+ @Override
+ protected void reduce(Text cluster, Iterable<Writable> points, Context context)
+ throws IOException, InterruptedException {
+ Collection<Writable> pointList = new ArrayList<Writable>();
+ for (Writable point : points) {
+ if (debugOutput) {
+ Vector pointVector = ((VectorWritable) point).get().clone();
+ Writable writablePointVector = new VectorWritable(pointVector);
+ pointList.add(writablePointVector);
+ } else {
+ Writable pointText = new Text(point.toString());
+ pointList.add(pointText);
+ }
+ }
+ if (pointList.size() >= minClusterSize) {
+ context.getCounter(Clusters.Accepted).increment(1);
+ for (Writable point : pointList) {
+ context.write(cluster, point);
+ }
+ } else {
+ context.getCounter(Clusters.Discarded).increment(1);
+ }
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/MinhashOptionCreator.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.common.commandline;
+
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+
+public final class MinhashOptionCreator {
+
+ public static final String NUM_HASH_FUNCTIONS = "numHashFunctions";
+ public static final String KEY_GROUPS = "keyGroups";
+ public static final String HASH_TYPE = "hashType";
+ public static final String MIN_CLUSTER_SIZE = "minClusterSize";
+ public static final String MIN_VECTOR_SIZE = "minVectorSize";
+ public static final String NUM_REDUCERS = "numReducers";
+ public static final String DEBUG_OUTPUT = "debugOutput";
+
+ private MinhashOptionCreator() {
+ }
+
+ public static DefaultOptionBuilder debugOutputOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(DEBUG_OUTPUT)
+ .withShortName("debug")
+ .withArgument(
+ new ArgumentBuilder().withName(DEBUG_OUTPUT).withDefault("false")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription("Cluster the whole vectors for debugging");
+ }
+
+ public static DefaultOptionBuilder numReducersOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(NUM_REDUCERS)
+ .withRequired(false)
+ .withShortName("r")
+ .withArgument(
+ new ArgumentBuilder().withName(NUM_REDUCERS).withDefault("2")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription("The number of reduce tasks. Defaults to 2");
+ }
+
+ /**
+ * Returns a default command line option for specifying the minimum cluster
+ * size in MinHash clustering
+ */
+ public static DefaultOptionBuilder minClusterSizeOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(MIN_CLUSTER_SIZE)
+ .withRequired(false)
+ .withArgument(
+ new ArgumentBuilder().withName(MIN_CLUSTER_SIZE).withDefault("10")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription("Minimum points inside a cluster")
+ .withShortName("mcs");
+ }
+
+ /**
+ * Returns a default command line option for specifying the type of hash to
+ * use in MinHash clustering: Should one out of
+ * ("linear","polynomial","murmur")
+ */
+ public static DefaultOptionBuilder hashTypeOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(HASH_TYPE)
+ .withRequired(false)
+ .withArgument(
+ new ArgumentBuilder().withName(HASH_TYPE).withDefault("murmur")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "Type of hash function to use. Available types: (linear, polynomial, murmur) ")
+ .withShortName("ht");
+ }
+
+ /**
+ * Returns a default command line option for specifying the min size of the
+ * vector to hash Should one out of ("linear","polynomial","murmur")
+ */
+ public static DefaultOptionBuilder minVectorSizeOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(MIN_VECTOR_SIZE)
+ .withRequired(false)
+ .withArgument(
+ new ArgumentBuilder().withName(MIN_VECTOR_SIZE).withDefault("5")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription("Minimum size of vector to be hashed")
+ .withShortName("mvs");
+ }
+
+ /**
+ * Returns a default command line option for specifying the number of hash
+ * functions to be used in MinHash clustering
+ */
+ public static DefaultOptionBuilder numHashFunctionsOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(NUM_HASH_FUNCTIONS)
+ .withRequired(false)
+ .withArgument(
+ new ArgumentBuilder().withName(NUM_HASH_FUNCTIONS)
+ .withDefault("10").withMinimum(1).withMaximum(1).create())
+ .withDescription("Number of hash functions to be used")
+ .withShortName("nh");
+ }
+
+ /**
+ * Returns a default command line option for specifying the number of key
+ * groups to be used in MinHash clustering
+ */
+ public static DefaultOptionBuilder keyGroupsOption() {
+ return new DefaultOptionBuilder()
+ .withLongName(KEY_GROUPS)
+ .withRequired(false)
+ .withArgument(
+ new ArgumentBuilder().withName(KEY_GROUPS).withDefault("2")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription("Number of key groups to be used").withShortName("kg");
+ }
+}
Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.minhash.HashFactory.HashType;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.commandline.MinhashOptionCreator;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+public class TestMinHashClustering extends MahoutTestCase {
+
+ public static final double[][] REFERENCE = { { 1, 2, 3, 4, 5 },
+ { 2, 1, 3, 6, 7 }, { 3, 7, 6, 11, 8, 9 }, { 4, 7, 8, 9, 6, 1 },
+ { 5, 8, 10, 4, 1 }, { 6, 17, 14, 15 }, { 8, 9, 11, 6, 12, 1, 7 },
+ { 10, 13, 9, 7, 4, 6, 3 }, { 3, 5, 7, 9, 2, 11 }, { 13, 7, 6, 8, 5 } };
+
+ private FileSystem fs;
+ private Path input;
+ private Path output;
+
+ public static List<VectorWritable> getPointsWritable(double[][] raw) {
+ List<VectorWritable> points = new ArrayList<VectorWritable>();
+ for (double[] fr : raw) {
+ Vector vec = new SequentialAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = new Configuration();
+ fs = FileSystem.get(conf);
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+ input = getTestTempDirPath("points");
+ output = new Path(getTestTempDirPath(), "output");
+ Path pointFile = new Path(input, "file1");
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, pointFile, Text.class, VectorWritable.class);
+ int id = 0;
+ for (VectorWritable point : points) {
+ writer.append(new Text("Id-" + id++), point);
+ }
+ writer.close();
+ }
+
+ private String[] makeArguments(int minClusterSize,
+ int minVectorSize,
+ int numHashFunctions,
+ int keyGroups,
+ String hashType){
+ return new String[] {
+ optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(MinhashOptionCreator.MIN_CLUSTER_SIZE), String.valueOf(minClusterSize),
+ optKey(MinhashOptionCreator.MIN_VECTOR_SIZE), String.valueOf(minVectorSize),
+ optKey(MinhashOptionCreator.HASH_TYPE), hashType,
+ optKey(MinhashOptionCreator.NUM_HASH_FUNCTIONS), String.valueOf(numHashFunctions),
+ optKey(MinhashOptionCreator.KEY_GROUPS), String.valueOf(keyGroups),
+ optKey(MinhashOptionCreator.NUM_REDUCERS), "1",
+ optKey(MinhashOptionCreator.DEBUG_OUTPUT), "true"};
+ }
+
+ private static Set<Integer> getValues(Vector vector) {
+ Iterator<Vector.Element> itr = vector.iterator();
+ Set<Integer> values = new HashSet<Integer>();
+ while (itr.hasNext()) {
+ values.add((int) itr.next().get());
+ }
+ return values;
+ }
+
+ private void verify(Path output) throws Exception {
+ Configuration conf = new Configuration();
+ Path outputFile = new Path(output, "part-r-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, outputFile, conf);
+ Writable clusterId = new Text();
+ VectorWritable point = new VectorWritable();
+ List<Vector> clusteredItems = new ArrayList<Vector>();
+ String prevClusterId = "";
+ while (reader.next(clusterId, point)) {
+ if (prevClusterId.equals(clusterId.toString())) {
+ clusteredItems.add(point.get().clone());
+ } else {
+ if (clusteredItems.size() > 1) {
+ // run pair-wise similarity test on items in a cluster
+ for (int i = 0; i < clusteredItems.size(); i++) {
+ Set<Integer> itemSet1 = getValues(clusteredItems.get(i));
+ for (int j = i + 1; j < clusteredItems.size(); j++) {
+ Set<Integer> itemSet2 = getValues(clusteredItems.get(j));
+ Set<Integer> union = new HashSet<Integer>();
+ union.addAll(itemSet1);
+ union.addAll(itemSet2);
+ Collection<Integer> intersect = new HashSet<Integer>();
+ intersect.addAll(itemSet1);
+ intersect.retainAll(itemSet2);
+ double similarity = intersect.size() / (double) union.size();
+ assertTrue("Sets failed min similarity test, Set1: "
+ + itemSet1 + " Set2: " + itemSet2 + ", similarity:" + similarity, similarity > 0.4);
+ }
+ }
+ }
+ clusteredItems.clear();
+ prevClusterId = clusterId.toString();
+ }
+ }
+ }
+
+ @Test
+ public void testLinearMinHashMRJob() throws Exception {
+ String[] args = makeArguments(2, 3, 20, 4, HashType.LINEAR.toString());
+ int ret = ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+ assertEquals("Minhash MR Job failed for " + HashType.LINEAR.toString(), 0, ret);
+ System.out.println("Verifying linear hash results");
+ verify(output);
+ }
+
+ @Test
+ public void testPolynomialMinHashMRJob() throws Exception {
+ String[] args = makeArguments(2, 3, 20, 4, HashType.POLYNOMIAL.toString());
+ int ret = ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+ assertEquals("Minhash MR Job failed for " + HashType.POLYNOMIAL.toString(), 0, ret);
+ System.out.println("Verifying linear hash results");
+ verify(output);
+ }
+
+ @Test
+ public void testMurmurMinHashMRJob() throws Exception {
+ String[] args = makeArguments(2, 3, 20, 4, HashType.MURMUR.toString());
+ int ret = ToolRunner.run(new Configuration(), new MinHashDriver(), args);
+ assertEquals("Minhash MR Job failed for " + HashType.MURMUR.toString(), 0, ret);
+ System.out.println("verifying murmur hash results");
+ verify(output);
+ }
+
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class LastfmClusterEvaluator {
+
+ private LastfmClusterEvaluator() {
+ }
+
+ /* Calculate used JVM memory */
+ private static String usedMemory() {
+ Runtime runtime = Runtime.getRuntime();
+ return "Used Memory: [" + ((runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)) + " MB] ";
+ }
+
+ /**
+ * Computer Jaccard coefficient over two sets. (A intersect B) / (A union B)
+ */
+ private static double computeSimilarity(Iterable<Integer> listenerVector1, Iterable<Integer> listenerVector2) {
+ Set<Integer> first = new HashSet<Integer>();
+ for (Integer ele : listenerVector1) {
+ first.add(ele);
+ }
+ Set<Integer> second = new HashSet<Integer>();
+ for (Integer ele : listenerVector2) {
+ second.add(ele);
+ }
+
+ Set<Integer> intersection = new HashSet<Integer>(first);
+ intersection.retainAll(second);
+ double intersectSize = intersection.size();
+
+ first.addAll(second);
+ double unionSize = first.size();
+ return (unionSize == 0) ? 0.0 : (intersectSize / unionSize);
+ }
+
+ /**
+ * Calculate the overall cluster precision by sampling clusters. Precision is
+ * calculated as follows :-
+ *
+ * 1. For a sample of all the clusters calculate the pair-wise similarity
+ * (Jaccard coefficient) for items in the same cluster.
+ *
+ * 2. Count true positives as items whose similarity is above specified
+ * threshold.
+ *
+ * 3. Precision = (true positives) / (total items in clusters sampled).
+ *
+ * @param clusterFile
+ * The file containing cluster information
+ * @param threshold
+ * Similarity threshold for containing two items in a cluster to be
+ * relevant. Must be between 0.0 and 1.0
+ * @param samplePercentage
+ * Percentage of clusters to sample. Must be between 0.0 and 1.0
+ */
+ private static void testPrecision(Path clusterFile, double threshold, double samplePercentage) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, clusterFile, conf);
+ Random rand = new Random(11);
+ Writable cluster = new Text();
+ Text prevCluster = new Text();
+ VectorWritable point = new VectorWritable();
+ List<List<Integer>> listenerVectors = new ArrayList<List<Integer>>();
+ long similarListeners = 0;
+ long allListeners = 0;
+ int clustersProcessed = 0;
+ while (reader.next(cluster, point)) {
+ if (!cluster.equals(prevCluster)) {
+ // We got a new cluster
+ prevCluster.set(cluster.toString());
+ // Should we check previous cluster ?
+ if (rand.nextDouble() > samplePercentage) {
+ listenerVectors.clear();
+ continue;
+ }
+ int numListeners = listenerVectors.size();
+ allListeners += numListeners;
+ for (int i = 0; i < numListeners; i++) {
+ List<Integer> listenerVector1 = listenerVectors.get(i);
+ for (int j = i + 1; j < numListeners; j++) {
+ List<Integer> listenerVector2 = listenerVectors.get(j);
+ double similarity = computeSimilarity(listenerVector1,
+ listenerVector2);
+ similarListeners += (similarity >= threshold) ? 1 : 0;
+ }
+ }
+ listenerVectors.clear();
+ clustersProcessed++;
+ System.out.print('\r' + usedMemory() + " Clusters processed: " + clustersProcessed);
+ }
+ List<Integer> listeners = new ArrayList<Integer>();
+ for (Vector.Element ele : point.get()) {
+ listeners.add((int) ele.get());
+ }
+ listenerVectors.add(listeners);
+ }
+ System.out.println("\nTest Results");
+ System.out.println("=============");
+ System.out.println(" (A) Listeners in same cluster with simiarity above threshold ("
+ + threshold + ") : " + similarListeners);
+ System.out.println(" (B) All listeners: " + allListeners);
+ NumberFormat format = NumberFormat.getInstance();
+ format.setMaximumFractionDigits(2);
+ double precision = ((double) similarListeners / allListeners) * 100.0;
+ System.out.println(" Average cluster precision: A/B = " + format.format(precision));
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length < 3) {
+ System.out.println("LastfmClusterEvaluation <cluster-file> <threshold> <sample-percentage>");
+ System.out.println(" <cluster-file>: Absolute Path of file containing cluster information in DEBUG format");
+ System.out.println(" <threshold>: Minimum threshold for jaccard co-efficient for considering two items");
+ System.out.println(" in a cluster to be really similar. Should be between 0.0 and 1.0");
+ System.out.println(" <sample-percentage>: Percentage of clusters to sample. Should be between 0.0 and 1.0");
+ return;
+ }
+ Path clusterFile = new Path(args[0]);
+ double threshold = Double.parseDouble(args[1]);
+ double samplePercentage = Double.parseDouble(args[2]);
+ testPrecision(clusterFile, threshold, samplePercentage);
+ }
+
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java?rev=1003415&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmDataConverter.java Fri Oct 1 07:40:53 2010
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.minhash;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public final class LastfmDataConverter {
+
+ private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+ // we are clustering similar featureIdxs on the following dataset
+ // http://www.iua.upf.es/~ocelma/MusicRecommendationDataset/index.html
+ //
+ // Preparation of the data set means gettting the dataset to a format which
+ // can
+ // be read by the min hash algorithm;
+ //
+ enum Lastfm {
+ USERS_360K(17559530),
+ USERS_1K(19150868);
+ private final int totalRecords;
+ Lastfm(int totalRecords) {
+ this.totalRecords = totalRecords;
+ }
+ }
+
+ private LastfmDataConverter() {
+ }
+
+ private static String usedMemory() {
+ Runtime runtime = Runtime.getRuntime();
+ return "Used Memory: [" + ((runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)) + " MB] ";
+ }
+
+ /* Get the feature from the parsed record */
+ private static String getFeature(String[] fields, Lastfm dataSet) {
+ if (dataSet == Lastfm.USERS_360K) {
+ return fields[0];
+ } else {
+ return fields[2];
+ }
+ }
+
+ /* Get the item from the parsed record */
+ private static String getItem(String[] fields, Lastfm dataSet) {
+ if (dataSet == Lastfm.USERS_360K) {
+ return fields[2];
+ } else {
+ return fields[0];
+ }
+ }
+
+ /**
+ * Reads the LastFm dataset and constructs a Map of (item, features). For 360K
+ * Users dataset - (Item=Artist, Feature=User) For 1K Users dataset -
+ * (Item=User, Feature=Artist)
+ *
+ * @param inputFile
+ * Lastfm dataset file on the local file system.
+ * @param dataSet
+ * Type of dataset - 360K Users or 1K Users
+ * @return
+ */
+ public static Map<String, List<Integer>> convertToItemFeatures(String inputFile, Lastfm dataSet) throws IOException {
+ long totalRecords = dataSet.totalRecords;
+ Map<String, Integer> featureIdxMap = new HashMap<String, Integer>();
+ Map<String, List<Integer>> itemFeaturesMap = new HashMap<String, List<Integer>>();
+ String msg = usedMemory() + "Converting data to internal vector format: ";
+ BufferedReader br = new BufferedReader(new FileReader(inputFile));
+ try {
+ System.out.print(msg);
+ int prevPercentDone = 1;
+ double percentDone = 0.0;
+ long parsedRecords = 0;
+ String line;
+ while ((line = br.readLine()) != null) {
+ String[] fields = TAB_PATTERN.split(line);
+ String feature = getFeature(fields, dataSet);
+ String item = getItem(fields, dataSet);
+ // get the featureIdx
+ Integer featureIdx = featureIdxMap.get(feature);
+ if (featureIdx == null) {
+ featureIdx = featureIdxMap.size() + 1;
+ featureIdxMap.put(feature, featureIdx);
+ }
+ // add it to the corresponding feature idx map
+ List<Integer> features = itemFeaturesMap.get(item);
+ if (features == null) {
+ features = new ArrayList<Integer>();
+ itemFeaturesMap.put(item, features);
+ }
+ features.add(featureIdx);
+ parsedRecords++;
+ // Update the progress
+ percentDone = (parsedRecords * 100) / totalRecords;
+ msg = usedMemory() + "Converting data to internal vector format: ";
+ if (percentDone > prevPercentDone) {
+ System.out.print('\r' + msg + percentDone + '%');
+ prevPercentDone++;
+ }
+ parsedRecords++;
+ }
+ msg = usedMemory() + "Converting data to internal vector format: ";
+ System.out.print('\r' + msg + percentDone + "% Completed\n");
+ } finally {
+ br.close();
+ }
+ return itemFeaturesMap;
+ }
+
+ /**
+ * Converts each record in (item,features) map into Mahout vector format and
+ * writes it into sequencefile for minhash clustering
+ */
+ public static boolean writeToSequenceFile(Map<String, List<Integer>> itemFeaturesMap, Path outputPath)
+ throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(outputPath.getParent());
+ long totalRecords = itemFeaturesMap.size();
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, outputPath, Text.class, VectorWritable.class);
+ try {
+ String msg = "Now writing vectorized data in sequence file format: ";
+ System.out.print(msg);
+
+ Text itemWritable = new Text();
+ VectorWritable featuresWritable = new VectorWritable();
+
+ int doneRecords = 0;
+ int prevPercentDone = 1;
+
+ for (Map.Entry<String, List<Integer>> itemFeature : itemFeaturesMap.entrySet()) {
+ int numfeatures = itemFeature.getValue().size();
+ itemWritable.set(itemFeature.getKey());
+ Vector featureVector = new SequentialAccessSparseVector(numfeatures);
+ int i = 0;
+ for (Integer feature : itemFeature.getValue()) {
+ featureVector.setQuick(i++, feature);
+ }
+ featuresWritable.set(featureVector);
+ writer.append(itemWritable, featuresWritable);
+ // Update the progress
+ double percentDone = ((++doneRecords) * 100) / totalRecords;
+ if (percentDone > prevPercentDone) {
+ System.out.print('\r' + msg + percentDone + "% " + ((percentDone >= 100) ? "Completed\n" : ""));
+ prevPercentDone++;
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ return true;
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 3) {
+ System.out.println("[Usage]: LastfmDataConverter <input> <output> <dataset>");
+ System.out.println(" <input>: Absolute path to the local file [usersha1-artmbid-artname-plays.tsv] ");
+ System.out.println(" <output>: Absolute path to the HDFS output file");
+ System.out.println(" <dataset>: Either of the two Lastfm public datasets. "
+ + "Must be either 'Users360K' or 'Users1K'");
+ System.out.println("Note:- Hadoop configuration pointing to HDFS namenode should be in classpath");
+ return;
+ }
+ Lastfm dataSet = Lastfm.valueOf(args[2]);
+ Map<String, List<Integer>> itemFeatures = convertToItemFeatures(args[0],
+ dataSet);
+ if (itemFeatures.isEmpty()) {
+ throw new IllegalStateException("Error converting the data file: [" + args[0] + ']');
+ }
+ Path output = new Path(args[1]);
+ boolean status = writeToSequenceFile(itemFeatures, output);
+ if (status) {
+ System.out.println("Data converted and written successfully to HDFS location: [" + output + ']');
+ } else {
+ System.err.println("Error writing the converted data to HDFS location: [" + output + ']');
+ }
+ }
+}