You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:46 UTC
[15/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/HighDFWordsPruner.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/HighDFWordsPruner.java b/mr/src/main/java/org/apache/mahout/vectorizer/HighDFWordsPruner.java
new file mode 100644
index 0000000..c3813c3
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/HighDFWordsPruner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.vectorizer.common.PartialVectorMerger;
+import org.apache.mahout.vectorizer.pruner.PrunedPartialVectorMergeReducer;
+import org.apache.mahout.vectorizer.pruner.WordsPrunerReducer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public final class HighDFWordsPruner {
+
+ public static final String STD_CALC_DIR = "stdcalc";
+ public static final String MAX_DF = "max.df";
+ public static final String MIN_DF = "min.df";
+
+ private HighDFWordsPruner() {
+ }
+
+ public static void pruneVectors(Path tfDir, Path prunedTFDir, Path prunedPartialTFDir, long maxDF,
+ long minDF, Configuration baseConf,
+ Pair<Long[], List<Path>> docFrequenciesFeatures,
+ float normPower,
+ boolean logNormalize,
+ int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
+
+ int partialVectorIndex = 0;
+ List<Path> partialVectorPaths = new ArrayList<>();
+ for (Path path : docFrequenciesFeatures.getSecond()) {
+ Path partialVectorOutputPath = new Path(prunedPartialTFDir, "partial-" + partialVectorIndex++);
+ partialVectorPaths.add(partialVectorOutputPath);
+ pruneVectorsPartial(tfDir, partialVectorOutputPath, path, maxDF, minDF, baseConf);
+ }
+
+ mergePartialVectors(partialVectorPaths, prunedTFDir, baseConf, normPower, logNormalize, numReducers);
+ HadoopUtil.delete(new Configuration(baseConf), prunedPartialTFDir);
+ }
+
+ private static void pruneVectorsPartial(Path input, Path output, Path dictionaryFilePath, long maxDF,
+ long minDF, Configuration baseConf) throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ Configuration conf = new Configuration(baseConf);
+ // this conf parameter needs to be set enable serialisation of conf
+ // values
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ conf.setLong(MAX_DF, maxDF);
+ conf.setLong(MIN_DF, minDF);
+ DistributedCache.addCacheFile(dictionaryFilePath.toUri(), conf);
+
+ Job job = HadoopUtil.prepareJob(input, output, SequenceFileInputFormat.class,
+ Mapper.class, null, null, WordsPrunerReducer.class,
+ Text.class, VectorWritable.class, SequenceFileOutputFormat.class,
+ conf);
+ job.setJobName(": Prune Vectors: input-folder: " + input
+ + ", dictionary-file: " + dictionaryFilePath.toString());
+
+ HadoopUtil.delete(conf, output);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+
+ public static void mergePartialVectors(Iterable<Path> partialVectorPaths,
+ Path output,
+ Configuration baseConf,
+ float normPower,
+ boolean logNormalize,
+ int numReducers)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration(baseConf);
+ // this conf parameter needs to be set enable serialisation of conf values
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ conf.setFloat(PartialVectorMerger.NORMALIZATION_POWER, normPower);
+ conf.setBoolean(PartialVectorMerger.LOG_NORMALIZE, logNormalize);
+
+ Job job = new Job(conf);
+ job.setJobName("PrunerPartialVectorMerger::MergePartialVectors");
+ job.setJarByClass(PartialVectorMerger.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(VectorWritable.class);
+
+ FileInputFormat.setInputPaths(job, getCommaSeparatedPaths(partialVectorPaths));
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setReducerClass(PrunedPartialVectorMergeReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(numReducers);
+
+ HadoopUtil.delete(conf, output);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+
+ private static String getCommaSeparatedPaths(Iterable<Path> paths) {
+ StringBuilder commaSeparatedPaths = new StringBuilder(100);
+ String sep = "";
+ for (Path path : paths) {
+ commaSeparatedPaths.append(sep).append(path.toString());
+ sep = ",";
+ }
+ return commaSeparatedPaths.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/SimpleTextEncodingVectorizer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/SimpleTextEncodingVectorizer.java b/mr/src/main/java/org/apache/mahout/vectorizer/SimpleTextEncodingVectorizer.java
new file mode 100644
index 0000000..e6339a1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/SimpleTextEncodingVectorizer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * <p>Runs a Map/Reduce job that encodes {@link org.apache.mahout.vectorizer.encoders.FeatureVectorEncoder} the
+ * input and writes it to the output as a sequence file.</p>
+ *
+ * <p>Only works on basic text, where the value in the SequenceFile is a blob of text.</p>
+ */
+//TODO: find commonalities w/ DictionaryVectorizer and abstract them out
+public class SimpleTextEncodingVectorizer implements Vectorizer {
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleTextEncodingVectorizer.class);
+
+ @Override
+ public void createVectors(Path input, Path output, VectorizerConfig config)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ //do this for convenience of using prepareJob
+ Job job = HadoopUtil.prepareJob(input, output,
+ SequenceFileInputFormat.class,
+ EncodingMapper.class,
+ Text.class,
+ VectorWritable.class,
+ SequenceFileOutputFormat.class,
+ config.getConf());
+ Configuration conf = job.getConfiguration();
+ conf.set(EncodingMapper.USE_SEQUENTIAL, String.valueOf(config.isSequentialAccess()));
+ conf.set(EncodingMapper.USE_NAMED_VECTORS, String.valueOf(config.isNamedVectors()));
+ conf.set(EncodingMapper.ANALYZER_NAME, config.getAnalyzerClassName());
+ conf.set(EncodingMapper.ENCODER_FIELD_NAME, config.getEncoderName());
+ conf.set(EncodingMapper.ENCODER_CLASS, config.getEncoderClass());
+ conf.set(EncodingMapper.CARDINALITY, String.valueOf(config.getCardinality()));
+ job.setNumReduceTasks(0);
+ boolean finished = job.waitForCompletion(true);
+
+ log.info("result of run: {}", finished);
+ if (!finished) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java b/mr/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
new file mode 100644
index 0000000..ee56124
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.lucene.AnalyzerUtils;
+import org.apache.mahout.math.hadoop.stats.BasicStats;
+import org.apache.mahout.vectorizer.collocations.llr.LLRReducer;
+import org.apache.mahout.vectorizer.common.PartialVectorMerger;
+import org.apache.mahout.vectorizer.tfidf.TFIDFConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Converts a given set of sequence files into SparseVectors
+ */
+public final class SparseVectorsFromSequenceFiles extends AbstractJob {
+
+ private static final Logger log = LoggerFactory.getLogger(SparseVectorsFromSequenceFiles.class);
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SparseVectorsFromSequenceFiles(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option inputDirOpt = DefaultOptionCreator.inputOption().create();
+
+ Option outputDirOpt = DefaultOptionCreator.outputOption().create();
+
+ Option minSupportOpt = obuilder.withLongName("minSupport").withArgument(
+ abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional) Minimum Support. Default Value: 2").withShortName("s").create();
+
+ Option analyzerNameOpt = obuilder.withLongName("analyzerName").withArgument(
+ abuilder.withName("analyzerName").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The class name of the analyzer").withShortName("a").create();
+
+ Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
+ abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The chunkSize in MegaBytes. Default Value: 100MB").withShortName("chunk").create();
+
+ Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument(
+ abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The kind of weight to use. Currently TF or TFIDF. Default: TFIDF").withShortName("wt").create();
+
+ Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument(
+ abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The minimum document frequency. Default is 1").withShortName("md").create();
+
+ Option maxDFPercentOpt = obuilder.withLongName("maxDFPercent").withRequired(false).withArgument(
+ abuilder.withName("maxDFPercent").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The max percentage of docs for the DF. Can be used to remove really high frequency terms."
+ + " Expressed as an integer between 0 and 100. Default is 99. If maxDFSigma is also set, "
+ + "it will override this value.").withShortName("x").create();
+
+ Option maxDFSigmaOpt = obuilder.withLongName("maxDFSigma").withRequired(false).withArgument(
+ abuilder.withName("maxDFSigma").withMinimum(1).withMaximum(1).create()).withDescription(
+ "What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) "
+ + "of the document frequencies of these vectors. Can be used to remove really high frequency terms."
+ + " Expressed as a double value. Good value to be specified is 3.0. In case the value is less "
+ + "than 0 no vectors will be filtered out. Default is -1.0. Overrides maxDFPercent")
+ .withShortName("xs").create();
+
+ Option minLLROpt = obuilder.withLongName("minLLR").withRequired(false).withArgument(
+ abuilder.withName("minLLR").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR)
+ .withShortName("ml").create();
+
+ Option numReduceTasksOpt = obuilder.withLongName("numReducers").withArgument(
+ abuilder.withName("numReducers").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional) Number of reduce tasks. Default Value: 1").withShortName("nr").create();
+
+ Option powerOpt = obuilder.withLongName("norm").withRequired(false).withArgument(
+ abuilder.withName("norm").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The norm to use, expressed as either a float or \"INF\" if you want to use the Infinite norm. "
+ + "Must be greater or equal to 0. The default is not to normalize").withShortName("n").create();
+
+ Option logNormalizeOpt = obuilder.withLongName("logNormalize").withRequired(false)
+ .withDescription(
+ "(Optional) Whether output vectors should be logNormalize. If set true else false")
+ .withShortName("lnorm").create();
+
+ Option maxNGramSizeOpt = obuilder.withLongName("maxNGramSize").withRequired(false).withArgument(
+ abuilder.withName("ngramSize").withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "(Optional) The maximum size of ngrams to create"
+ + " (2 = bigrams, 3 = trigrams, etc) Default Value:1").withShortName("ng").create();
+
+ Option sequentialAccessVectorOpt = obuilder.withLongName("sequentialAccessVector").withRequired(false)
+ .withDescription(
+ "(Optional) Whether output vectors should be SequentialAccessVectors. If set true else false")
+ .withShortName("seq").create();
+
+ Option namedVectorOpt = obuilder.withLongName("namedVector").withRequired(false)
+ .withDescription(
+ "(Optional) Whether output vectors should be NamedVectors. If set true else false")
+ .withShortName("nv").create();
+
+ Option overwriteOutput = obuilder.withLongName("overwrite").withRequired(false).withDescription(
+ "If set, overwrite the output directory").withShortName("ow").create();
+ Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
+ .create();
+
+ Group group = gbuilder.withName("Options").withOption(minSupportOpt).withOption(analyzerNameOpt)
+ .withOption(chunkSizeOpt).withOption(outputDirOpt).withOption(inputDirOpt).withOption(minDFOpt)
+ .withOption(maxDFSigmaOpt).withOption(maxDFPercentOpt).withOption(weightOpt).withOption(powerOpt)
+ .withOption(minLLROpt).withOption(numReduceTasksOpt).withOption(maxNGramSizeOpt).withOption(overwriteOutput)
+ .withOption(helpOpt).withOption(sequentialAccessVectorOpt).withOption(namedVectorOpt)
+ .withOption(logNormalizeOpt)
+ .create();
+ try {
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ parser.setHelpOption(helpOpt);
+ CommandLine cmdLine = parser.parse(args);
+
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return -1;
+ }
+
+ Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
+ Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt));
+
+ int chunkSize = 100;
+ if (cmdLine.hasOption(chunkSizeOpt)) {
+ chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
+ }
+ int minSupport = 2;
+ if (cmdLine.hasOption(minSupportOpt)) {
+ String minSupportString = (String) cmdLine.getValue(minSupportOpt);
+ minSupport = Integer.parseInt(minSupportString);
+ }
+
+ int maxNGramSize = 1;
+
+ if (cmdLine.hasOption(maxNGramSizeOpt)) {
+ try {
+ maxNGramSize = Integer.parseInt(cmdLine.getValue(maxNGramSizeOpt).toString());
+ } catch (NumberFormatException ex) {
+ log.warn("Could not parse ngram size option");
+ }
+ }
+ log.info("Maximum n-gram size is: {}", maxNGramSize);
+
+ if (cmdLine.hasOption(overwriteOutput)) {
+ HadoopUtil.delete(getConf(), outputDir);
+ }
+
+ float minLLRValue = LLRReducer.DEFAULT_MIN_LLR;
+ if (cmdLine.hasOption(minLLROpt)) {
+ minLLRValue = Float.parseFloat(cmdLine.getValue(minLLROpt).toString());
+ }
+ log.info("Minimum LLR value: {}", minLLRValue);
+
+ int reduceTasks = 1;
+ if (cmdLine.hasOption(numReduceTasksOpt)) {
+ reduceTasks = Integer.parseInt(cmdLine.getValue(numReduceTasksOpt).toString());
+ }
+ log.info("Number of reduce tasks: {}", reduceTasks);
+
+ Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;
+ if (cmdLine.hasOption(analyzerNameOpt)) {
+ String className = cmdLine.getValue(analyzerNameOpt).toString();
+ analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+ // try instantiating it, b/c there isn't any point in setting it if
+ // you can't instantiate it
+ AnalyzerUtils.createAnalyzer(analyzerClass);
+ }
+
+ boolean processIdf;
+
+ if (cmdLine.hasOption(weightOpt)) {
+ String wString = cmdLine.getValue(weightOpt).toString();
+ if ("tf".equalsIgnoreCase(wString)) {
+ processIdf = false;
+ } else if ("tfidf".equalsIgnoreCase(wString)) {
+ processIdf = true;
+ } else {
+ throw new OptionException(weightOpt);
+ }
+ } else {
+ processIdf = true;
+ }
+
+ int minDf = 1;
+ if (cmdLine.hasOption(minDFOpt)) {
+ minDf = Integer.parseInt(cmdLine.getValue(minDFOpt).toString());
+ }
+ int maxDFPercent = 99;
+ if (cmdLine.hasOption(maxDFPercentOpt)) {
+ maxDFPercent = Integer.parseInt(cmdLine.getValue(maxDFPercentOpt).toString());
+ }
+ double maxDFSigma = -1.0;
+ if (cmdLine.hasOption(maxDFSigmaOpt)) {
+ maxDFSigma = Double.parseDouble(cmdLine.getValue(maxDFSigmaOpt).toString());
+ }
+
+ float norm = PartialVectorMerger.NO_NORMALIZING;
+ if (cmdLine.hasOption(powerOpt)) {
+ String power = cmdLine.getValue(powerOpt).toString();
+ if ("INF".equals(power)) {
+ norm = Float.POSITIVE_INFINITY;
+ } else {
+ norm = Float.parseFloat(power);
+ }
+ }
+
+ boolean logNormalize = false;
+ if (cmdLine.hasOption(logNormalizeOpt)) {
+ logNormalize = true;
+ }
+ log.info("Tokenizing documents in {}", inputDir);
+ Configuration conf = getConf();
+ Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
+ //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom
+ // to have one framework for all of this.
+ DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);
+
+ boolean sequentialAccessOutput = false;
+ if (cmdLine.hasOption(sequentialAccessVectorOpt)) {
+ sequentialAccessOutput = true;
+ }
+
+ boolean namedVectors = false;
+ if (cmdLine.hasOption(namedVectorOpt)) {
+ namedVectors = true;
+ }
+ boolean shouldPrune = maxDFSigma >= 0.0 || maxDFPercent > 0.00;
+ String tfDirName = shouldPrune
+ ? DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-toprune"
+ : DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER;
+ log.info("Creating Term Frequency Vectors");
+ if (processIdf) {
+ DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
+ outputDir,
+ tfDirName,
+ conf,
+ minSupport,
+ maxNGramSize,
+ minLLRValue,
+ -1.0f,
+ false,
+ reduceTasks,
+ chunkSize,
+ sequentialAccessOutput,
+ namedVectors);
+ } else {
+ DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
+ outputDir,
+ tfDirName,
+ conf,
+ minSupport,
+ maxNGramSize,
+ minLLRValue,
+ norm,
+ logNormalize,
+ reduceTasks,
+ chunkSize,
+ sequentialAccessOutput,
+ namedVectors);
+ }
+
+ Pair<Long[], List<Path>> docFrequenciesFeatures = null;
+ // Should document frequency features be processed
+ if (shouldPrune || processIdf) {
+ log.info("Calculating IDF");
+ docFrequenciesFeatures =
+ TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize);
+ }
+
+ long maxDF = maxDFPercent; //if we are pruning by std dev, then this will get changed
+ if (shouldPrune) {
+ long vectorCount = docFrequenciesFeatures.getFirst()[1];
+ if (maxDFSigma >= 0.0) {
+ Path dfDir = new Path(outputDir, TFIDFConverter.WORDCOUNT_OUTPUT_FOLDER);
+ Path stdCalcDir = new Path(outputDir, HighDFWordsPruner.STD_CALC_DIR);
+
+ // Calculate the standard deviation
+ double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
+ maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);
+ }
+
+ long maxDFThreshold = (long) (vectorCount * (maxDF / 100.0f));
+
+ // Prune the term frequency vectors
+ Path tfDir = new Path(outputDir, tfDirName);
+ Path prunedTFDir = new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER);
+ Path prunedPartialTFDir =
+ new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER + "-partial");
+ log.info("Pruning");
+ if (processIdf) {
+ HighDFWordsPruner.pruneVectors(tfDir,
+ prunedTFDir,
+ prunedPartialTFDir,
+ maxDFThreshold,
+ minDf,
+ conf,
+ docFrequenciesFeatures,
+ -1.0f,
+ false,
+ reduceTasks);
+ } else {
+ HighDFWordsPruner.pruneVectors(tfDir,
+ prunedTFDir,
+ prunedPartialTFDir,
+ maxDFThreshold,
+ minDf,
+ conf,
+ docFrequenciesFeatures,
+ norm,
+ logNormalize,
+ reduceTasks);
+ }
+ HadoopUtil.delete(new Configuration(conf), tfDir);
+ }
+ if (processIdf) {
+ TFIDFConverter.processTfIdf(
+ new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
+ outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
+ sequentialAccessOutput, namedVectors, reduceTasks);
+ }
+ } catch (OptionException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/TF.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/TF.java b/mr/src/main/java/org/apache/mahout/vectorizer/TF.java
new file mode 100644
index 0000000..1818580
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/TF.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+/**
+ * {@link Weight} based on term frequency only
+ */
+public class TF implements Weight {
+
+ @Override
+ public double calculate(int tf, int df, int length, int numDocs) {
+ //ignore length
+ return tf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/TFIDF.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/TFIDF.java b/mr/src/main/java/org/apache/mahout/vectorizer/TFIDF.java
new file mode 100644
index 0000000..0a537eb
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/TFIDF.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+import org.apache.lucene.search.similarities.DefaultSimilarity;
+//TODO: add a new class that supports arbitrary Lucene similarity implementations
+public class TFIDF implements Weight {
+
+ private final DefaultSimilarity sim = new DefaultSimilarity();
+
+ @Override
+ public double calculate(int tf, int df, int length, int numDocs) {
+ // ignore length
+ return sim.tf(tf) * sim.idf(df, numDocs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/Vectorizer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/Vectorizer.java b/mr/src/main/java/org/apache/mahout/vectorizer/Vectorizer.java
new file mode 100644
index 0000000..45f0043
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/Vectorizer.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public interface Vectorizer {
+
+ void createVectors(Path input, Path output, VectorizerConfig config)
+ throws IOException, ClassNotFoundException, InterruptedException;
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/VectorizerConfig.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/VectorizerConfig.java b/mr/src/main/java/org/apache/mahout/vectorizer/VectorizerConfig.java
new file mode 100644
index 0000000..edaf2f3
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/VectorizerConfig.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The config for a Vectorizer. Not all implementations need use all variables.
+ */
+public final class VectorizerConfig {
+
+ private Configuration conf;
+ private String analyzerClassName;
+ private String encoderName;
+ private boolean sequentialAccess;
+ private boolean namedVectors;
+ private int cardinality;
+ private String encoderClass;
+ private String tfDirName;
+ private int minSupport;
+ private int maxNGramSize;
+ private float minLLRValue;
+ private float normPower;
+ private boolean logNormalize;
+ private int numReducers;
+ private int chunkSizeInMegabytes;
+
+ public VectorizerConfig(Configuration conf,
+ String analyzerClassName,
+ String encoderClass,
+ String encoderName,
+ boolean sequentialAccess,
+ boolean namedVectors,
+ int cardinality) {
+ this.conf = conf;
+ this.analyzerClassName = analyzerClassName;
+ this.encoderClass = encoderClass;
+ this.encoderName = encoderName;
+ this.sequentialAccess = sequentialAccess;
+ this.namedVectors = namedVectors;
+ this.cardinality = cardinality;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public String getAnalyzerClassName() {
+ return analyzerClassName;
+ }
+
+ public void setAnalyzerClassName(String analyzerClassName) {
+ this.analyzerClassName = analyzerClassName;
+ }
+
+ public String getEncoderName() {
+ return encoderName;
+ }
+
+ public void setEncoderName(String encoderName) {
+ this.encoderName = encoderName;
+ }
+
+ public boolean isSequentialAccess() {
+ return sequentialAccess;
+ }
+
+ public void setSequentialAccess(boolean sequentialAccess) {
+ this.sequentialAccess = sequentialAccess;
+ }
+
+
+ public String getTfDirName() {
+ return tfDirName;
+ }
+
+ public void setTfDirName(String tfDirName) {
+ this.tfDirName = tfDirName;
+ }
+
+ public boolean isNamedVectors() {
+ return namedVectors;
+ }
+
+ public void setNamedVectors(boolean namedVectors) {
+ this.namedVectors = namedVectors;
+ }
+
+ public int getCardinality() {
+ return cardinality;
+ }
+
+ public void setCardinality(int cardinality) {
+ this.cardinality = cardinality;
+ }
+
+ public String getEncoderClass() {
+ return encoderClass;
+ }
+
+ public void setEncoderClass(String encoderClass) {
+ this.encoderClass = encoderClass;
+ }
+
+ public int getMinSupport() {
+ return minSupport;
+ }
+
+ public void setMinSupport(int minSupport) {
+ this.minSupport = minSupport;
+ }
+
+ public int getMaxNGramSize() {
+ return maxNGramSize;
+ }
+
+ public void setMaxNGramSize(int maxNGramSize) {
+ this.maxNGramSize = maxNGramSize;
+ }
+
+ public float getMinLLRValue() {
+ return minLLRValue;
+ }
+
+ public void setMinLLRValue(float minLLRValue) {
+ this.minLLRValue = minLLRValue;
+ }
+
+ public float getNormPower() {
+ return normPower;
+ }
+
+ public void setNormPower(float normPower) {
+ this.normPower = normPower;
+ }
+
+ public boolean isLogNormalize() {
+ return logNormalize;
+ }
+
+ public void setLogNormalize(boolean logNormalize) {
+ this.logNormalize = logNormalize;
+ }
+
+ public int getNumReducers() {
+ return numReducers;
+ }
+
+ public void setNumReducers(int numReducers) {
+ this.numReducers = numReducers;
+ }
+
+ public int getChunkSizeInMegabytes() {
+ return chunkSizeInMegabytes;
+ }
+
+ public void setChunkSizeInMegabytes(int chunkSizeInMegabytes) {
+ this.chunkSizeInMegabytes = chunkSizeInMegabytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/Weight.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/Weight.java b/mr/src/main/java/org/apache/mahout/vectorizer/Weight.java
new file mode 100644
index 0000000..e36159d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/Weight.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer;
+
+public interface Weight {
+
+ /**
+ * Experimental
+ *
+ * @param tf term freq
+ * @param df doc freq
+ * @param length Length of the document
+ * @param numDocs the total number of docs
+ * @return The weight
+ */
+ double calculate(int tf, int df, int length, int numDocs);
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocCombiner.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocCombiner.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocCombiner.java
new file mode 100644
index 0000000..54cadbd
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocCombiner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+/** Combiner for pass1 of the CollocationDriver. Combines frequencies for values for the same key */
+public class CollocCombiner extends Reducer<GramKey, Gram, GramKey, Gram> {
+
+ @Override
+ protected void reduce(GramKey key, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
+
+ int freq = 0;
+ Gram value = null;
+
+ // accumulate frequencies from values, preserve the last value
+ // to write to the context.
+ for (Gram value1 : values) {
+ value = value1;
+ freq += value.getFrequency();
+ }
+
+ if (value != null) {
+ value.setFrequency(freq);
+ context.write(key, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java
new file mode 100644
index 0000000..a07ddbd
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.lucene.AnalyzerUtils;
+import org.apache.mahout.vectorizer.DocumentProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver for LLR Collocation discovery mapreduce job */
+public final class CollocDriver extends AbstractJob {
+ //public static final String DEFAULT_OUTPUT_DIRECTORY = "output";
+
+ public static final String SUBGRAM_OUTPUT_DIRECTORY = "subgrams";
+
+ public static final String NGRAM_OUTPUT_DIRECTORY = "ngrams";
+
+ public static final String EMIT_UNIGRAMS = "emit-unigrams";
+
+ public static final boolean DEFAULT_EMIT_UNIGRAMS = false;
+
+ private static final int DEFAULT_MAX_NGRAM_SIZE = 2;
+
+ private static final int DEFAULT_PASS1_NUM_REDUCE_TASKS = 1;
+
+ private static final Logger log = LoggerFactory.getLogger(CollocDriver.class);
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new CollocDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.numReducersOption().create());
+
+ addOption("maxNGramSize",
+ "ng",
+ "(Optional) The max size of ngrams to create (2 = bigrams, 3 = trigrams, etc) default: 2",
+ String.valueOf(DEFAULT_MAX_NGRAM_SIZE));
+ addOption("minSupport", "s", "(Optional) Minimum Support. Default Value: "
+ + CollocReducer.DEFAULT_MIN_SUPPORT, String.valueOf(CollocReducer.DEFAULT_MIN_SUPPORT));
+ addOption("minLLR", "ml", "(Optional)The minimum Log Likelihood Ratio(Float) Default is "
+ + LLRReducer.DEFAULT_MIN_LLR, String.valueOf(LLRReducer.DEFAULT_MIN_LLR));
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption("analyzerName", "a", "The class name of the analyzer to use for preprocessing", null);
+
+ addFlag("preprocess", "p", "If set, input is SequenceFile<Text,Text> where the value is the document, "
+ + " which will be tokenized using the specified analyzer.");
+ addFlag("unigram", "u", "If set, unigrams will be emitted in the final output alongside collocations");
+
+ Map<String, List<String>> argMap = parseArguments(args);
+
+ if (argMap == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+
+ int maxNGramSize = DEFAULT_MAX_NGRAM_SIZE;
+ if (hasOption("maxNGramSize")) {
+ try {
+ maxNGramSize = Integer.parseInt(getOption("maxNGramSize"));
+ } catch (NumberFormatException ex) {
+ log.warn("Could not parse ngram size option");
+ }
+ }
+ log.info("Maximum n-gram size is: {}", maxNGramSize);
+
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
+
+ int minSupport = CollocReducer.DEFAULT_MIN_SUPPORT;
+ if (getOption("minSupport") != null) {
+ minSupport = Integer.parseInt(getOption("minSupport"));
+ }
+ log.info("Minimum Support value: {}", minSupport);
+
+ float minLLRValue = LLRReducer.DEFAULT_MIN_LLR;
+ if (getOption("minLLR") != null) {
+ minLLRValue = Float.parseFloat(getOption("minLLR"));
+ }
+ log.info("Minimum LLR value: {}", minLLRValue);
+
+ int reduceTasks = DEFAULT_PASS1_NUM_REDUCE_TASKS;
+ if (getOption("maxRed") != null) {
+ reduceTasks = Integer.parseInt(getOption("maxRed"));
+ }
+ log.info("Number of pass1 reduce tasks: {}", reduceTasks);
+
+ boolean emitUnigrams = argMap.containsKey("emitUnigrams");
+
+ if (argMap.containsKey("preprocess")) {
+ log.info("Input will be preprocessed");
+ Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;
+ if (getOption("analyzerName") != null) {
+ String className = getOption("analyzerName");
+ analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+ // try instantiating it, b/c there isn't any point in setting it if
+ // you can't instantiate it
+ AnalyzerUtils.createAnalyzer(analyzerClass);
+ }
+
+ Path tokenizedPath = new Path(output, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
+
+ DocumentProcessor.tokenizeDocuments(input, analyzerClass, tokenizedPath, getConf());
+ input = tokenizedPath;
+ } else {
+ log.info("Input will NOT be preprocessed");
+ }
+
+ // parse input and extract collocations
+ long ngramCount =
+ generateCollocations(input, output, getConf(), emitUnigrams, maxNGramSize, reduceTasks, minSupport);
+
+ // tally collocations and perform LLR calculation
+ computeNGramsPruneByLLR(output, getConf(), ngramCount, emitUnigrams, minLLRValue, reduceTasks);
+
+ return 0;
+ }
+
+ /**
+ * Generate all ngrams for the {@link org.apache.mahout.vectorizer.DictionaryVectorizer} job
+ *
+ * @param input
+ * input path containing tokenized documents
+ * @param output
+ * output path where ngrams are generated including unigrams
+ * @param baseConf
+ * job configuration
+ * @param maxNGramSize
+ * minValue = 2.
+ * @param minSupport
+ * minimum support to prune ngrams including unigrams
+ * @param minLLRValue
+ * minimum threshold to prune ngrams
+ * @param reduceTasks
+ * number of reducers used
+ */
+ public static void generateAllGrams(Path input,
+ Path output,
+ Configuration baseConf,
+ int maxNGramSize,
+ int minSupport,
+ float minLLRValue,
+ int reduceTasks)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ // parse input and extract collocations
+ long ngramCount = generateCollocations(input, output, baseConf, true, maxNGramSize, reduceTasks, minSupport);
+
+ // tally collocations and perform LLR calculation
+ computeNGramsPruneByLLR(output, baseConf, ngramCount, true, minLLRValue, reduceTasks);
+ }
+
+ /**
+ * pass1: generate collocations, ngrams
+ */
+ private static long generateCollocations(Path input,
+ Path output,
+ Configuration baseConf,
+ boolean emitUnigrams,
+ int maxNGramSize,
+ int reduceTasks,
+ int minSupport)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ Configuration con = new Configuration(baseConf);
+ con.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
+ con.setInt(CollocMapper.MAX_SHINGLE_SIZE, maxNGramSize);
+ con.setInt(CollocReducer.MIN_SUPPORT, minSupport);
+
+ Job job = new Job(con);
+ job.setJobName(CollocDriver.class.getSimpleName() + ".generateCollocations:" + input);
+ job.setJarByClass(CollocDriver.class);
+
+ job.setMapOutputKeyClass(GramKey.class);
+ job.setMapOutputValueClass(Gram.class);
+ job.setPartitionerClass(GramKeyPartitioner.class);
+ job.setGroupingComparatorClass(GramKeyGroupComparator.class);
+
+ job.setOutputKeyClass(Gram.class);
+ job.setOutputValueClass(Gram.class);
+
+ job.setCombinerClass(CollocCombiner.class);
+
+ FileInputFormat.setInputPaths(job, input);
+
+ Path outputPath = new Path(output, SUBGRAM_OUTPUT_DIRECTORY);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(CollocMapper.class);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setReducerClass(CollocReducer.class);
+ job.setNumReduceTasks(reduceTasks);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+
+ return job.getCounters().findCounter(CollocMapper.Count.NGRAM_TOTAL).getValue();
+ }
+
+ /**
+ * pass2: perform the LLR calculation
+ */
+ private static void computeNGramsPruneByLLR(Path output,
+ Configuration baseConf,
+ long nGramTotal,
+ boolean emitUnigrams,
+ float minLLRValue,
+ int reduceTasks)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration(baseConf);
+ conf.setLong(LLRReducer.NGRAM_TOTAL, nGramTotal);
+ conf.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
+ conf.setFloat(LLRReducer.MIN_LLR, minLLRValue);
+
+ Job job = new Job(conf);
+ job.setJobName(CollocDriver.class.getSimpleName() + ".computeNGrams: " + output);
+ job.setJarByClass(CollocDriver.class);
+
+ job.setMapOutputKeyClass(Gram.class);
+ job.setMapOutputValueClass(Gram.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(DoubleWritable.class);
+
+ FileInputFormat.setInputPaths(job, new Path(output, SUBGRAM_OUTPUT_DIRECTORY));
+ Path outPath = new Path(output, NGRAM_OUTPUT_DIRECTORY);
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setReducerClass(LLRReducer.class);
+ job.setNumReduceTasks(reduceTasks);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapper.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapper.java
new file mode 100644
index 0000000..fd99293
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapper.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.analysis.shingle.ShingleFilter;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.TypeAttribute;
+import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.common.lucene.IteratorTokenStream;
+import org.apache.mahout.math.function.ObjectIntProcedure;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Pass 1 of the Collocation discovery job which generated ngrams and emits ngrams an their component n-1grams.
+ * Input is a SequeceFile<Text,StringTuple>, where the key is a document id and the value is the tokenized documents.
+ * <p/>
+ */
+public class CollocMapper extends Mapper<Text, StringTuple, GramKey, Gram> {
+
+ private static final byte[] EMPTY = new byte[0];
+
+ public static final String MAX_SHINGLE_SIZE = "maxShingleSize";
+
+ private static final int DEFAULT_MAX_SHINGLE_SIZE = 2;
+
+ public enum Count {
+ NGRAM_TOTAL
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(CollocMapper.class);
+
+ private int maxShingleSize;
+
+ private boolean emitUnigrams;
+
+ /**
+ * Collocation finder: pass 1 map phase.
+ * <p/>
+ * Receives a token stream which gets passed through a Lucene ShingleFilter. The ShingleFilter delivers ngrams of
+ * the appropriate size which are then decomposed into head and tail subgrams which are collected in the
+ * following manner
+ * <p/>
+ * <pre>
+ * k:head_key, v:head_subgram
+ * k:head_key,ngram_key, v:ngram
+ * k:tail_key, v:tail_subgram
+ * k:tail_key,ngram_key, v:ngram
+ * </pre>
+ * <p/>
+ * The 'head' or 'tail' prefix is used to specify whether the subgram in question is the head or tail of the
+ * ngram. In this implementation the head of the ngram is a (n-1)gram, and the tail is a (1)gram.
+ * <p/>
+ * For example, given 'click and clack' and an ngram length of 3:
+ * <pre>
+ * k: head_'click and' v:head_'click and'
+ * k: head_'click and',ngram_'click and clack' v:ngram_'click and clack'
+ * k: tail_'clack', v:tail_'clack'
+ * k: tail_'clack',ngram_'click and clack' v:ngram_'click and clack'
+ * </pre>
+ * <p/>
+ * Also counts the total number of ngrams encountered and adds it to the counter
+ * CollocDriver.Count.NGRAM_TOTAL
+ * </p>
+ *
+ * @throws IOException if there's a problem with the ShingleFilter reading data or the collector collecting output.
+ */
+ @Override
+ protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
+
+ try (ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()), maxShingleSize)){
+ sf.reset();
+ int count = 0; // ngram count
+
+ OpenObjectIntHashMap<String> ngrams =
+ new OpenObjectIntHashMap<>(value.getEntries().size() * (maxShingleSize - 1));
+ OpenObjectIntHashMap<String> unigrams = new OpenObjectIntHashMap<>(value.getEntries().size());
+
+ do {
+ String term = sf.getAttribute(CharTermAttribute.class).toString();
+ String type = sf.getAttribute(TypeAttribute.class).type();
+ if ("shingle".equals(type)) {
+ count++;
+ ngrams.adjustOrPutValue(term, 1, 1);
+ } else if (emitUnigrams && !term.isEmpty()) { // unigram
+ unigrams.adjustOrPutValue(term, 1, 1);
+ }
+ } while (sf.incrementToken());
+
+ final GramKey gramKey = new GramKey();
+
+ ngrams.forEachPair(new ObjectIntProcedure<String>() {
+ @Override
+ public boolean apply(String term, int frequency) {
+ // obtain components, the leading (n-1)gram and the trailing unigram.
+ int i = term.lastIndexOf(' '); // TODO: fix for non-whitespace delimited languages.
+ if (i != -1) { // bigram, trigram etc
+
+ try {
+ Gram ngram = new Gram(term, frequency, Gram.Type.NGRAM);
+ Gram head = new Gram(term.substring(0, i), frequency, Gram.Type.HEAD);
+ Gram tail = new Gram(term.substring(i + 1), frequency, Gram.Type.TAIL);
+
+ gramKey.set(head, EMPTY);
+ context.write(gramKey, head);
+
+ gramKey.set(head, ngram.getBytes());
+ context.write(gramKey, ngram);
+
+ gramKey.set(tail, EMPTY);
+ context.write(gramKey, tail);
+
+ gramKey.set(tail, ngram.getBytes());
+ context.write(gramKey, ngram);
+
+ } catch (IOException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return true;
+ }
+ });
+
+ unigrams.forEachPair(new ObjectIntProcedure<String>() {
+ @Override
+ public boolean apply(String term, int frequency) {
+ try {
+ Gram unigram = new Gram(term, frequency, Gram.Type.UNIGRAM);
+ gramKey.set(unigram, EMPTY);
+ context.write(gramKey, unigram);
+ } catch (IOException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ return true;
+ }
+ });
+
+ context.getCounter(Count.NGRAM_TOTAL).increment(count);
+ sf.end();
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.maxShingleSize = conf.getInt(MAX_SHINGLE_SIZE, DEFAULT_MAX_SHINGLE_SIZE);
+
+ this.emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+ if (log.isInfoEnabled()) {
+ log.info("Max Ngram size is {}", this.maxShingleSize);
+ log.info("Emit Unitgrams is {}", emitUnigrams);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocReducer.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocReducer.java
new file mode 100644
index 0000000..1fe13e3
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocReducer.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reducer for Pass 1 of the collocation identification job. Generates counts for ngrams and subgrams.
+ */
+public class CollocReducer extends Reducer<GramKey, Gram, Gram, Gram> {
+
+ private static final Logger log = LoggerFactory.getLogger(CollocReducer.class);
+
+ public static final String MIN_SUPPORT = "minSupport";
+
+ public static final int DEFAULT_MIN_SUPPORT = 2;
+
+ public enum Skipped {
+ LESS_THAN_MIN_SUPPORT, MALFORMED_KEY_TUPLE, MALFORMED_TUPLE, MALFORMED_TYPES, MALFORMED_UNIGRAM
+ }
+
+ private int minSupport;
+
+ /**
+ * collocation finder: pass 1 reduce phase:
+ * <p/>
+ * given input from the mapper,
+ *
+ * <pre>
+ * k:head_subgram,ngram, v:ngram:partial freq
+ * k:head_subgram v:head_subgram:partial freq
+ * k:tail_subgram,ngram, v:ngram:partial freq
+ * k:tail_subgram v:tail_subgram:partial freq
+ * k:unigram v:unigram:partial freq
+ * </pre>
+ * sum gram frequencies and output for llr calculation
+ * <p/>
+ * output is:
+ * <pre>
+ * k:ngram:ngramfreq v:head_subgram:head_subgramfreq
+ * k:ngram:ngramfreq v:tail_subgram:tail_subgramfreq
+ * k:unigram:unigramfreq v:unigram:unigramfreq
+ * </pre>
+ * Each ngram's frequency is essentially counted twice, once for head, once for tail.
+ * frequency should be the same for the head and tail. Fix this to count only for the
+ * head and move the count into the value?
+ */
+ @Override
+ protected void reduce(GramKey key, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
+
+ Gram.Type keyType = key.getType();
+
+ if (keyType == Gram.Type.UNIGRAM) {
+ // sum frequencies for unigrams.
+ processUnigram(values.iterator(), context);
+ } else if (keyType == Gram.Type.HEAD || keyType == Gram.Type.TAIL) {
+ // sum frequencies for subgrams, ngram and collect for each ngram.
+ processSubgram(values.iterator(), context);
+ } else {
+ context.getCounter(Skipped.MALFORMED_TYPES).increment(1);
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.minSupport = conf.getInt(MIN_SUPPORT, DEFAULT_MIN_SUPPORT);
+
+ boolean emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+ log.info("Min support is {}", minSupport);
+ log.info("Emit Unitgrams is {}", emitUnigrams);
+ }
+
+ /**
+ * Sum frequencies for unigrams and deliver to the collector
+ */
+ protected void processUnigram(Iterator<Gram> values, Context context)
+ throws IOException, InterruptedException {
+
+ int freq = 0;
+ Gram value = null;
+
+ // accumulate frequencies from values.
+ while (values.hasNext()) {
+ value = values.next();
+ freq += value.getFrequency();
+ }
+
+ if (freq < minSupport) {
+ context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
+ return;
+ }
+
+ value.setFrequency(freq);
+ context.write(value, value);
+
+ }
+
+ /** Sum frequencies for subgram, ngrams and deliver ngram, subgram pairs to the collector.
+ * <p/>
+ * Sort order guarantees that the subgram/subgram pairs will be seen first and then
+ * subgram/ngram1 pairs, subgram/ngram2 pairs ... subgram/ngramN pairs, so frequencies for
+ * ngrams can be calcualted here as well.
+ * <p/>
+ * We end up calculating frequencies for ngrams for each sugram (head, tail) here, which is
+ * some extra work.
+ * @throws InterruptedException
+ */
+ protected void processSubgram(Iterator<Gram> values, Context context)
+ throws IOException, InterruptedException {
+
+ Gram subgram = null;
+ Gram currentNgram = null;
+
+ while (values.hasNext()) {
+ Gram value = values.next();
+
+ if (value.getType() == Gram.Type.HEAD || value.getType() == Gram.Type.TAIL) {
+ // collect frequency for subgrams.
+ if (subgram == null) {
+ subgram = new Gram(value);
+ } else {
+ subgram.incrementFrequency(value.getFrequency());
+ }
+ } else if (!value.equals(currentNgram)) {
+ // we've collected frequency for all subgrams and we've encountered a new ngram.
+ // collect the old ngram if there was one and we have sufficient support and
+ // create the new ngram.
+ if (currentNgram != null) {
+ if (currentNgram.getFrequency() < minSupport) {
+ context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
+ } else {
+ context.write(currentNgram, subgram);
+ }
+ }
+
+ currentNgram = new Gram(value);
+ } else {
+ currentNgram.incrementFrequency(value.getFrequency());
+ }
+ }
+
+ // collect last ngram.
+ if (currentNgram != null) {
+ if (currentNgram.getFrequency() < minSupport) {
+ context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
+ return;
+ }
+
+ context.write(currentNgram, subgram);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/Gram.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/Gram.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/Gram.java
new file mode 100644
index 0000000..58234b3
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/Gram.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.math.Varint;
+
+/**
+ * Writable for holding data generated from the collocation discovery jobs. Depending on the job configuration
+ * gram may be one or more words. In some contexts this is used to hold a complete ngram, while in others it
+ * holds a part of an existing ngram (subgram). Tracks the frequency of the gram and its position in the ngram
+ * in which is was found.
+ */
+public class Gram extends BinaryComparable implements WritableComparable<BinaryComparable> {
+
+ public enum Type {
+ HEAD('h'),
+ TAIL('t'),
+ UNIGRAM('u'),
+ NGRAM('n');
+
+ private final char x;
+
+ Type(char c) {
+ this.x = c;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(x);
+ }
+ }
+
+ private byte[] bytes;
+ private int length;
+ private int frequency;
+
+ public Gram() {
+
+ }
+
+ /**
+ * Copy constructor
+ */
+ public Gram(Gram other) {
+ frequency = other.frequency;
+ length = other.length;
+ bytes = other.bytes.clone();
+ }
+
+ /**
+ * Create an gram with a frequency of 1
+ *
+ * @param ngram
+ * the gram string
+ * @param type
+ * whether the gram is at the head or tail of its text unit or it is a unigram
+ */
+ public Gram(String ngram, Type type) {
+ this(ngram, 1, type);
+ }
+
+
+ /**
+ *
+ * Create a gram with the specified frequency.
+ *
+ * @param ngram
+ * the gram string
+ * @param frequency
+ * the gram frequency
+ * @param type
+ * whether the gram is at the head of its text unit or tail or unigram
+ */
+ public Gram(String ngram, int frequency, Type type) {
+ Preconditions.checkNotNull(ngram);
+ try {
+ // extra character is used for storing type which is part
+ // of the sort key.
+ ByteBuffer bb = Text.encode('\0' + ngram, true);
+ bytes = bb.array();
+ length = bb.limit();
+ } catch (CharacterCodingException e) {
+ throw new IllegalStateException("Should not have happened ",e);
+ }
+
+ encodeType(type, bytes, 0);
+ this.frequency = frequency;
+ }
+
+
+ @Override
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public int getLength() {
+ return length;
+ }
+
+ /**
+ * @return the gram is at the head of its text unit or tail or unigram.
+ */
+ public Type getType() {
+ return decodeType(bytes, 0);
+ }
+
+ /**
+ * @return gram term string
+ */
+ public String getString() {
+ try {
+ return Text.decode(bytes, 1, length - 1);
+ } catch (CharacterCodingException e) {
+ throw new IllegalStateException("Should not have happened " + e);
+ }
+ }
+
+ /**
+ * @return gram frequency
+ */
+ public int getFrequency() {
+ return frequency;
+ }
+
+ /**
+ * @param frequency
+ * gram's frequency
+ */
+ public void setFrequency(int frequency) {
+ this.frequency = frequency;
+ }
+
+ public void incrementFrequency(int i) {
+ this.frequency += i;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int newLength = Varint.readUnsignedVarInt(in);
+ setCapacity(newLength, false);
+ in.readFully(bytes, 0, newLength);
+ int newFrequency = Varint.readUnsignedVarInt(in);
+ length = newLength;
+ frequency = newFrequency;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Varint.writeUnsignedVarInt(length, out);
+ out.write(bytes, 0, length);
+ Varint.writeUnsignedVarInt(frequency, out);
+ }
+
+ /* Cribbed from o.a.hadoop.io.Text:
+ * Sets the capacity of this object to <em>at least</em>
+ * {@code len} bytes. If the current buffer is longer,
+ * then the capacity and existing content of the buffer are
+ * unchanged. If {@code len} is larger
+ * than the current capacity, this object's capacity is
+ * increased to match.
+ * @param len the number of bytes we need
+ * @param keepData should the old data be kept
+ */
+ private void setCapacity(int len, boolean keepData) {
+ len++; // extra byte to hold type
+ if (bytes == null || bytes.length < len) {
+ byte[] newBytes = new byte[len];
+ if (bytes != null && keepData) {
+ System.arraycopy(bytes, 0, newBytes, 0, length);
+ }
+ bytes = newBytes;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return '\'' + getString() + "'[" + getType() + "]:" + frequency;
+ }
+
+ public static void encodeType(Type type, byte[] buf, int offset) {
+ switch (type) {
+ case HEAD:
+ buf[offset] = 0x1;
+ break;
+ case TAIL:
+ buf[offset] = 0x2;
+ break;
+ case UNIGRAM:
+ buf[offset] = 0x3;
+ break;
+ case NGRAM:
+ buf[offset] = 0x4;
+ break;
+ default:
+ throw new IllegalStateException("switch/case problem in encodeType");
+ }
+ }
+
+ public static Type decodeType(byte[] buf, int offset) {
+ switch (buf[offset]) {
+ case 0x1:
+ return Type.HEAD;
+ case 0x2:
+ return Type.TAIL;
+ case 0x3:
+ return Type.UNIGRAM;
+ case 0x4:
+ return Type.NGRAM;
+ default:
+ throw new IllegalStateException("switch/case problem in decodeType");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKey.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKey.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKey.java
new file mode 100644
index 0000000..e33ed51
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKey.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.vectorizer.collocations.llr.Gram.Type;
+
+/** A GramKey, based on the identity fields of Gram (type, string) plus a byte[] used for secondary ordering */
+public final class GramKey extends BinaryComparable implements WritableComparable<BinaryComparable> {
+
+ private int primaryLength;
+ private int length;
+ private byte[] bytes;
+
+ public GramKey() {
+
+ }
+
+ /** create a GramKey based on the specified Gram and order
+ *
+ * @param gram
+ * @param order
+ */
+ public GramKey(Gram gram, byte[] order) {
+ set(gram, order);
+ }
+
+ /** set the gram held by this key */
+ public void set(Gram gram, byte[] order) {
+ primaryLength = gram.getLength();
+ length = primaryLength + order.length;
+ setCapacity(length, false);
+ System.arraycopy(gram.getBytes(), 0, bytes, 0, primaryLength);
+ if (order.length > 0) {
+ System.arraycopy(order, 0, bytes, primaryLength, order.length);
+ }
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public int getLength() {
+ return length;
+ }
+
+ public int getPrimaryLength() {
+ return primaryLength;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int newLength = Varint.readUnsignedVarInt(in);
+ int newPrimaryLength = Varint.readUnsignedVarInt(in);
+ setCapacity(newLength, false);
+ in.readFully(bytes, 0, newLength);
+ length = newLength;
+ primaryLength = newPrimaryLength;
+
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Varint.writeUnsignedVarInt(length, out);
+ Varint.writeUnsignedVarInt(primaryLength, out);
+ out.write(bytes, 0, length);
+ }
+
+ /* Cribbed from o.a.hadoop.io.Text:
+ * Sets the capacity of this object to <em>at least</em>
+ * {@code len} bytes. If the current buffer is longer,
+ * then the capacity and existing content of the buffer are
+ * unchanged. If {@code len} is larger
+ * than the current capacity, this object's capacity is
+ * increased to match.
+ * @param len the number of bytes we need
+ * @param keepData should the old data be kept
+ */
+ private void setCapacity(int len, boolean keepData) {
+ if (bytes == null || bytes.length < len) {
+ byte[] newBytes = new byte[len];
+ if (bytes != null && keepData) {
+ System.arraycopy(bytes, 0, newBytes, 0, length);
+ }
+ bytes = newBytes;
+ }
+ }
+
+ /**
+ * @return the gram is at the head of its text unit or tail or unigram.
+ */
+ public Type getType() {
+ return Gram.decodeType(bytes, 0);
+ }
+
+ public String getPrimaryString() {
+ try {
+ return Text.decode(bytes, 1, primaryLength - 1);
+ } catch (CharacterCodingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return '\'' + getPrimaryString() + "'[" + getType() + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyGroupComparator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyGroupComparator.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyGroupComparator.java
new file mode 100644
index 0000000..7b73d71
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyGroupComparator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import java.io.Serializable;
+
+/** Group GramKeys based on their Gram, ignoring the secondary sort key, so that all keys with the same Gram are sent
+ * to the same call of the reduce method, sorted in natural order (for GramKeys).
+ */
+class GramKeyGroupComparator extends WritableComparator implements Serializable {
+
+ GramKeyGroupComparator() {
+ super(GramKey.class, true);
+ }
+
+ @Override
+ public int compare(WritableComparable a, WritableComparable b) {
+ GramKey gka = (GramKey) a;
+ GramKey gkb = (GramKey) b;
+
+ return WritableComparator.compareBytes(gka.getBytes(), 0, gka.getPrimaryLength(),
+ gkb.getBytes(), 0, gkb.getPrimaryLength());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyPartitioner.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyPartitioner.java b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyPartitioner.java
new file mode 100644
index 0000000..a68339f
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/vectorizer/collocations/llr/GramKeyPartitioner.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.vectorizer.collocations.llr;
+
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Partition GramKeys based on their Gram, ignoring the secondary sort key so that all GramKeys with the same
+ * gram are sent to the same partition.
+ */
+public final class GramKeyPartitioner extends Partitioner<GramKey, Gram> {
+
+ @Override
+ public int getPartition(GramKey key, Gram value, int numPartitions) {
+ int hash = 1;
+ byte[] bytes = key.getBytes();
+ int length = key.getPrimaryLength();
+ // Copied from WritableComparator.hashBytes(); skips first byte, type byte
+ for (int i = 1; i < length; i++) {
+ hash = (31 * hash) + bytes[i];
+ }
+ return (hash & Integer.MAX_VALUE) % numPartitions;
+ }
+
+}