You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2011/01/24 22:42:41 UTC
svn commit: r1062994 - in /mahout/trunk/core/src:
main/java/org/apache/mahout/vectorizer/
main/java/org/apache/mahout/vectorizer/collocations/llr/
main/java/org/apache/mahout/vectorizer/common/
main/java/org/apache/mahout/vectorizer/tfidf/ test/java/or...
Author: srowen
Date: Mon Jan 24 21:42:41 2011
New Revision: 1062994
URL: http://svn.apache.org/viewvc?rev=1062994&view=rev
Log:
MAHOUT-587 Fix exceptions in seq2sparse by more properly using the Configuration object throughout
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DocumentProcessor.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/common/PartialVectorMerger.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DictionaryVectorizerTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DocumentProcessorTest.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java Mon Jan 24 21:42:41 2011
@@ -98,6 +98,8 @@ public final class DictionaryVectorizer
* @param output
* output directory where {@link org.apache.mahout.math.RandomAccessSparseVector}'s of the document
* are generated
+ * @param baseConf
+ * job configuration
* @param normPower
* L_p norm to be computed
* @param logNormalize
@@ -149,14 +151,15 @@ public final class DictionaryVectorizer
int[] maxTermDimension = new int[1];
List<Path> dictionaryChunks;
if (maxNGramSize == 1) {
- startWordCounting(input, dictionaryJobPath, minSupport);
+ startWordCounting(input, dictionaryJobPath, baseConf, minSupport);
dictionaryChunks = createDictionaryChunks(dictionaryJobPath, output,
- chunkSizeInMegabytes, new LongWritable(), maxTermDimension);
+ baseConf, chunkSizeInMegabytes, new LongWritable(), maxTermDimension);
} else {
CollocDriver.generateAllGrams(input, dictionaryJobPath, baseConf, maxNGramSize,
minSupport, minLLRValue, numReducers);
dictionaryChunks = createDictionaryChunks(new Path(
new Path(output, DICTIONARY_JOB_FOLDER), CollocDriver.NGRAM_OUTPUT_DIRECTORY), output,
+ baseConf,
chunkSizeInMegabytes, new DoubleWritable(), maxTermDimension);
}
@@ -165,15 +168,15 @@ public final class DictionaryVectorizer
for (Path dictionaryChunk : dictionaryChunks) {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
- makePartialVectors(input, maxNGramSize, dictionaryChunk, partialVectorOutputPath,
+ makePartialVectors(input, baseConf, maxNGramSize, dictionaryChunk, partialVectorOutputPath,
maxTermDimension[0], sequentialAccess, namedVectors, numReducers);
}
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
FileSystem fs = FileSystem.get(partialVectorPaths.get(0).toUri(), conf);
Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);
- PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, normPower, logNormalize,
+ PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, conf, normPower, logNormalize,
maxTermDimension[0], sequentialAccess, namedVectors, numReducers);
HadoopUtil.deletePaths(partialVectorPaths, fs);
}
@@ -181,20 +184,17 @@ public final class DictionaryVectorizer
/**
* Read the feature frequency List which is built at the end of the Word Count Job and assign ids to them.
* This will use constant memory and will run at the speed of your disk read
- *
- * @param wordCountPath
- * @param dictionaryPathBase
- * @throws IOException
*/
private static List<Path> createDictionaryChunks(Path wordCountPath,
Path dictionaryPathBase,
+ Configuration baseConf,
int chunkSizeInMegabytes,
Writable value,
int[] maxTermDimension) throws IOException {
List<Path> chunkPaths = new ArrayList<Path>();
Writable key = new Text();
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);
FileStatus[] outputFiles = fs.globStatus(new Path(wordCountPath, OUTPUT_FILES_PATTERN));
@@ -244,6 +244,8 @@ public final class DictionaryVectorizer
*
* @param input
* input directory of the documents in {@link SequenceFile} format
+ * @param baseConf
+ * job configuration
* @param maxNGramSize
* maximum size of ngrams to generate
* @param dictionaryFilePath
@@ -259,6 +261,7 @@ public final class DictionaryVectorizer
* the desired number of reducer tasks
*/
private static void makePartialVectors(Path input,
+ Configuration baseConf,
int maxNGramSize,
Path dictionaryFilePath,
Path output,
@@ -268,7 +271,7 @@ public final class DictionaryVectorizer
int numReducers)
throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
// this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
@@ -306,10 +309,10 @@ public final class DictionaryVectorizer
* Count the frequencies of words in parallel using Map/Reduce. The input documents have to be in
* {@link SequenceFile} format
*/
- private static void startWordCounting(Path input, Path output, int minSupport)
+ private static void startWordCounting(Path input, Path output, Configuration baseConf, int minSupport)
throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
// this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DocumentProcessor.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DocumentProcessor.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DocumentProcessor.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DocumentProcessor.java Mon Jan 24 21:42:41 2011
@@ -65,13 +65,12 @@ public final class DocumentProcessor {
* output directory were the {@link StringTuple} token array of each document has to be created
* @param analyzerClass
* The Lucene {@link Analyzer} for tokenizing the UTF-8 text
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
*/
- public static void tokenizeDocuments(Path input, Class<? extends Analyzer> analyzerClass,
- Path output) throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
+ public static void tokenizeDocuments(Path input,
+ Class<? extends Analyzer> analyzerClass,
+ Path output,
+ Configuration baseConf) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration(baseConf);
// this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java Mon Jan 24 21:42:41 2011
@@ -27,7 +27,9 @@ import org.apache.commons.cli2.builder.G
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.vectorizer.collocations.llr.LLRReducer;
@@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory;
/**
* Converts a given set of sequence files into SparseVectors
*/
-public final class SparseVectorsFromSequenceFiles {
+public final class SparseVectorsFromSequenceFiles extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger(SparseVectorsFromSequenceFiles.class);
@@ -47,6 +49,10 @@ public final class SparseVectorsFromSequ
}
public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SparseVectorsFromSequenceFiles(), args);
+ }
+
+ public int run(String[] args) throws Exception {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
@@ -138,7 +144,7 @@ public final class SparseVectorsFromSequ
if (cmdLine.hasOption(helpOpt)) {
CommandLineUtil.printHelp(group);
- return;
+ return -1;
}
Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
@@ -230,8 +236,9 @@ public final class SparseVectorsFromSequ
}
HadoopUtil.overwriteOutput(outputDir);
+ Configuration conf = getConf();
Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
- DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath);
+ DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);
boolean sequentialAccessOutput = false;
if (cmdLine.hasOption(sequentialAccessVectorOpt)) {
@@ -243,7 +250,6 @@ public final class SparseVectorsFromSequ
namedVectors = true;
}
- Configuration conf = new Configuration();
if (!processIdf) {
DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, outputDir, conf, minSupport, maxNGramSize,
minLLRValue, norm, logNormalize, reduceTasks, chunkSize, sequentialAccessOutput, namedVectors);
@@ -253,13 +259,14 @@ public final class SparseVectorsFromSequ
TFIDFConverter.processTfIdf(
new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
- outputDir, chunkSize, minDf, maxDFPercent, norm, logNormalize,
+ outputDir, conf, chunkSize, minDf, maxDFPercent, norm, logNormalize,
sequentialAccessOutput, namedVectors, reduceTasks);
}
} catch (OptionException e) {
log.error("Exception", e);
CommandLineUtil.printHelp(group);
}
+ return 0;
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java Mon Jan 24 21:42:41 2011
@@ -143,7 +143,7 @@ public final class CollocDriver extends
Path tokenizedPath = new Path(output, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
- DocumentProcessor.tokenizeDocuments(input, analyzerClass, tokenizedPath);
+ DocumentProcessor.tokenizeDocuments(input, analyzerClass, tokenizedPath, getConf());
input = tokenizedPath;
} else {
log.info("Input will NOT be preprocessed");
@@ -166,6 +166,8 @@ public final class CollocDriver extends
* input path containing tokenized documents
* @param output
* output path where ngrams are generated including unigrams
+ * @param baseConf
+ * job configuration
* @param maxNGramSize
* minValue = 2.
* @param minSupport
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/common/PartialVectorMerger.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/common/PartialVectorMerger.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/common/PartialVectorMerger.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/common/PartialVectorMerger.java Mon Jan 24 21:42:41 2011
@@ -68,6 +68,8 @@ public final class PartialVectorMerger {
* input directory of the vectors in {@link org.apache.hadoop.io.SequenceFile} format
* @param output
* output directory were the partial vectors have to be created
+ * @param baseConf
+ * job configuration
* @param normPower
* The normalization value. Must be greater than or equal to 0 or equal to {@link #NO_NORMALIZING}
* @param dimension
@@ -80,6 +82,7 @@ public final class PartialVectorMerger {
*/
public static void mergePartialVectors(Iterable<Path> partialVectorPaths,
Path output,
+ Configuration baseConf,
float normPower,
boolean logNormalize,
int dimension,
@@ -94,7 +97,7 @@ public final class PartialVectorMerger {
|| !logNormalize,
"normPower must be > 1 and not infinite if log normalization is chosen", normPower);
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
// this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java Mon Jan 24 21:42:41 2011
@@ -116,6 +116,7 @@ public final class TFIDFConverter {
*/
public static void processTfIdf(Path input,
Path output,
+ Configuration baseConf,
int chunkSizeInMegabytes,
int minDf,
int maxDFPercent,
@@ -146,8 +147,8 @@ public final class TFIDFConverter {
Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);
- startDFCounting(input, wordCountPath);
- Pair<Long[], List<Path>> datasetFeatures = createDictionaryChunks(wordCountPath, output, chunkSizeInMegabytes);
+ startDFCounting(input, wordCountPath, baseConf);
+ Pair<Long[], List<Path>> datasetFeatures = createDictionaryChunks(wordCountPath, output, baseConf, chunkSizeInMegabytes);
int partialVectorIndex = 0;
List<Path> partialVectorPaths = new ArrayList<Path>();
@@ -156,6 +157,7 @@ public final class TFIDFConverter {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
makePartialVectors(input,
+ baseConf,
datasetFeatures.getFirst()[0],
datasetFeatures.getFirst()[1],
minDf,
@@ -166,13 +168,14 @@ public final class TFIDFConverter {
namedVector);
}
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
FileSystem fs = FileSystem.get(partialVectorPaths.get(0).toUri(), conf);
Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);
PartialVectorMerger.mergePartialVectors(partialVectorPaths,
outputDir,
+ baseConf,
normPower,
logNormalize,
datasetFeatures.getFirst()[0].intValue(),
@@ -189,12 +192,13 @@ public final class TFIDFConverter {
*/
private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,
Path dictionaryPathBase,
+ Configuration baseConf,
int chunkSizeInMegabytes) throws IOException {
List<Path> chunkPaths = new ArrayList<Path>();
IntWritable key = new IntWritable();
LongWritable value = new LongWritable();
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);
FileStatus[] outputFiles = fs.globStatus(new Path(featureCountPath, OUTPUT_FILES_PATTERN));
@@ -267,6 +271,7 @@ public final class TFIDFConverter {
* output vectors should be named, retaining key (doc id) as a label
*/
private static void makePartialVectors(Path input,
+ Configuration baseConf,
Long featureCount,
Long vectorCount,
int minDf,
@@ -277,7 +282,7 @@ public final class TFIDFConverter {
boolean namedVector)
throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
// this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
@@ -313,10 +318,10 @@ public final class TFIDFConverter {
* Count the document frequencies of features in parallel using Map/Reduce. The input documents have to be
* in {@link SequenceFile} format
*/
- private static void startDFCounting(Path input, Path output)
+ private static void startDFCounting(Path input, Path output, Configuration baseConf)
throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(baseConf);
// this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DictionaryVectorizerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DictionaryVectorizerTest.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DictionaryVectorizerTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DictionaryVectorizerTest.java Mon Jan 24 21:42:41 2011
@@ -95,7 +95,8 @@ public final class DictionaryVectorizerT
Path tfidf = getTestTempDirPath("output/tfidf");
Path tfidfVectors = new Path(tfidf, "tfidf-vectors");
- DocumentProcessor.tokenizeDocuments(inputPath, analyzer, tokenizedDocuments);
+ Configuration conf = new Configuration();
+ DocumentProcessor.tokenizeDocuments(inputPath, analyzer, tokenizedDocuments, conf);
DictionaryVectorizer.createTermFrequencyVectors(tokenizedDocuments,
wordCount,
@@ -114,6 +115,7 @@ public final class DictionaryVectorizerT
TFIDFConverter.processTfIdf(tfVectors,
tfidf,
+ conf,
100,
1,
99,
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DocumentProcessorTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DocumentProcessorTest.java?rev=1062994&r1=1062993&r2=1062994&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DocumentProcessorTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DocumentProcessorTest.java Mon Jan 24 21:42:41 2011
@@ -35,7 +35,7 @@ public class DocumentProcessorTest exten
writer.append(new Text(documentId2), new Text(text2));
writer.close();
- DocumentProcessor.tokenizeDocuments(input, DefaultAnalyzer.class, output);
+ DocumentProcessor.tokenizeDocuments(input, DefaultAnalyzer.class, output, configuration);
FileStatus[] statuses = fs.listStatus(output);
assertEquals(1, statuses.length);