You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2011/03/31 11:25:30 UTC
svn commit: r1087225 [1/7] - in /mahout/trunk:
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/
core/src/main/java/org/apache/m...
Author: srowen
Date: Thu Mar 31 09:25:25 2011
New Revision: 1087225
URL: http://svn.apache.org/viewvc?rev=1087225&view=rev
Log:
MAHOUT-633 new iterators over SequenceFiles, used in 90% of the code involving reading SequenceFiles; other refactoring of iterators and some related HDFS usages (PathFilters); some small style changes along the way too
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
- copied, changed from r1084222, mahout/trunk/core/src/main/java/org/apache/mahout/common/FileLineIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
- copied, changed from r1084222, mahout/trunk/core/src/main/java/org/apache/mahout/common/FileLineIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/IteratorsIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
- copied, changed from r1084222, mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/TransformingIterator.java
- copied, changed from r1086202, mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/DelegatingIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
mahout/trunk/core/src/test/java/org/apache/mahout/common/iterator/IteratorsIteratorTest.java
- copied, changed from r1085936, mahout/trunk/core/src/test/java/org/apache/mahout/common/iterator/IteratorUtilsTest.java
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/OutputLogFilter.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/FileLineIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/FileLineIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/StringRecordIterator.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/SequenceFileVectorIterable.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/io/SequenceFileVectorIterableTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileDataModel.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileIDMigrator.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/slopeone/file/FileDiffStorage.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/similarity/file/FileItemItemSimilarityIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TestClassifier.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/NaiveBayesModel.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesInstanceMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesThetaComplementaryMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesThetaMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesTrainer.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesWeightsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDAMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/Pair.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/DelegatingIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/IteratorIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/data/Data.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/Builder.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/Classifier.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/inmem/InMemBuilder.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/PartialBuilder.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/FrequenciesJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/TransactionIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/integer/IntegerTupleIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/convertors/string/StringTupleIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/fpgrowth/FPGrowth.java
mahout/trunk/core/src/main/java/org/apache/mahout/ga/watchmaker/MahoutEvaluator.java
mahout/trunk/core/src/main/java/org/apache/mahout/ga/watchmaker/OutputUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/DocumentProcessor.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFiles.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/collocations/llr/CollocDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/common/PartialVectorMerger.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesClassifierSelfTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFileFormatterTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/minhash/TestMinHashClustering.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java
mahout/trunk/core/src/test/java/org/apache/mahout/common/iterator/SamplerCase.java
mahout/trunk/core/src/test/java/org/apache/mahout/common/iterator/TestFixedSizeSampler.java
mahout/trunk/core/src/test/java/org/apache/mahout/common/iterator/TestStableFixedSizeSampler.java
mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/PartitionBugTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthRetailDataTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/FPGrowthTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/ga/watchmaker/MahoutEvaluatorTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/DictionaryVectorizerTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/SparseVectorsFromSequenceFilesTest.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/bookcrossing/BookCrossingDataModel.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/grouplens/GroupLensDataModel.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/jester/JesterDataModel.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track2/TrackItemSimilarity.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/netflix/NetflixDataModel.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/netflix/NetflixFileDataModel.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/netflix/TransposeToByUser.java
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/bayes/WikipediaDatasetCreatorDriver.java
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/bayes/WikipediaXmlSplitter.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/minhash/LastfmClusterEvaluator.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
mahout/trunk/examples/src/main/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDMahoutEvaluator.java
mahout/trunk/examples/src/main/java/org/apache/mahout/ga/watchmaker/cd/tool/CDInfosTool.java
mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaMapper.java
mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/FileInfosDatasetTest.java
mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/tool/CDInfosToolTest.java
mahout/trunk/utils/src/main/java/org/apache/mahout/benchmark/VectorBenchmarks.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/MapBackedARFFModel.java
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
mahout/trunk/utils/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/RandomVectorIterable.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/csv/CSVVectorIterableTest.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/io/VectorWriterTest.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Thu Mar 31 09:25:25 2011
@@ -18,12 +18,13 @@
package org.apache.mahout.cf.taste.hadoop;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.map.OpenIntLongHashMap;
@@ -53,16 +54,6 @@ public final class TasteHadoopUtils {
}
/**
- * A path filter used to read files written by Hadoop.
- */
- public static final PathFilter PARTS_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith("part-");
- }
- };
-
- /**
* Maps a long to an int
*/
public static int idToIndex(long id) {
@@ -74,23 +65,11 @@ public final class TasteHadoopUtils {
*/
public static OpenIntLongHashMap readItemIDIndexMap(String itemIDIndexPathStr, Configuration conf) {
OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
- try {
- Path unqualifiedItemIDIndexPath = new Path(itemIDIndexPathStr);
- FileSystem fs = FileSystem.get(unqualifiedItemIDIndexPath.toUri(), conf);
- Path itemIDIndexPath = new Path(itemIDIndexPathStr).makeQualified(fs);
-
- VarIntWritable index = new VarIntWritable();
- VarLongWritable id = new VarLongWritable();
- for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
- String path = status.getPath().toString();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), conf);
- while (reader.next(index, id)) {
- indexItemIDMap.put(index.get(), id.get());
- }
- reader.close();
- }
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
+ Path itemIDIndexPath = new Path(itemIDIndexPathStr);
+ for (Pair<VarIntWritable,VarLongWritable> record :
+ new SequenceFileDirIterable<VarIntWritable,VarLongWritable>(
+ itemIDIndexPath, PathType.LIST, PathFilters.partFilter(), null, true, conf)) {
+ indexItemIDMap.put(record.getFirst().get(), record.getSecond().get());
}
return indexItemIDMap;
}
@@ -99,8 +78,8 @@ public final class TasteHadoopUtils {
* Reads a text-based outputfile that only contains an int
*/
public static int readIntFromFile(Configuration conf, Path outputDir) throws IOException {
- FileSystem fs = FileSystem.get(outputDir.toUri(), conf);
- Path outputFile = fs.listStatus(outputDir, PARTS_FILTER)[0].getPath();
+ FileSystem fs = outputDir.getFileSystem(conf);
+ Path outputFile = fs.listStatus(outputDir, PathFilters.partFilter())[0].getPath();
InputStream in = null;
try {
in = fs.open(outputFile);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Thu Mar 31 09:25:25 2011
@@ -29,7 +29,7 @@ import org.apache.mahout.cf.taste.impl.c
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
-import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.common.iterator.FileLineIterable;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
@@ -72,7 +72,7 @@ public final class AggregateAndRecommend
private static final float BOOLEAN_PREF_VALUE = 1.0f;
@Override
- protected void setup(Context context) {
+ protected void setup(Context context) throws IOException {
Configuration jobConf = context.getConfiguration();
recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
@@ -93,8 +93,6 @@ public final class AggregateAndRecommend
itemsToRecommendFor.add(Long.parseLong(line));
}
}
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
} finally {
IOUtils.closeStream(in);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Thu Mar 31 09:25:25 2011
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.impl.common.FastIDSet;
-import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.common.iterator.FileLineIterable;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
@@ -45,7 +45,7 @@ public final class UserVectorSplitterMap
private FastIDSet usersToRecommendFor;
@Override
- protected void setup(Context context) {
+ protected void setup(Context context) throws IOException {
Configuration jobConf = context.getConfiguration();
maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED, DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
String usersFilePathString = jobConf.get(USERS_FILE);
@@ -59,9 +59,7 @@ public final class UserVectorSplitterMap
in = fs.open(usersFilePath);
for (String line : new FileLineIterable(in)) {
usersToRecommendFor.add(Long.parseLong(line));
- }
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
+ }
} finally {
IOUtils.closeStream(in);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Thu Mar 31 09:25:25 2011
@@ -57,21 +57,17 @@ public final class RecommenderReducer ex
private int recommendationsPerUser;
@Override
- protected void setup(Context context) {
+ protected void setup(Context context) throws IOException {
Configuration jobConf = context.getConfiguration();
String dataModelFile = jobConf.get(DATA_MODEL_FILE);
String recommenderClassName = jobConf.get(RECOMMENDER_CLASS_NAME);
- FileDataModel fileDataModel;
- try {
- Path dataModelPath = new Path(dataModelFile);
- FileSystem fs = FileSystem.get(dataModelPath.toUri(), jobConf);
- File tempDataFile = File.createTempFile("mahout-taste-hadoop", "txt");
- tempDataFile.deleteOnExit();
- fs.copyToLocalFile(dataModelPath, new Path(tempDataFile.getAbsolutePath()));
- fileDataModel = new FileDataModel(tempDataFile);
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
+ Path dataModelPath = new Path(dataModelFile);
+ FileSystem fs = FileSystem.get(dataModelPath.toUri(), jobConf);
+ File tempDataFile = File.createTempFile("mahout-taste-hadoop", "txt");
+ tempDataFile.deleteOnExit();
+ fs.copyToLocalFile(dataModelPath, new Path(tempDataFile.getAbsolutePath()));
+ FileDataModel fileDataModel = new FileDataModel(tempDataFile);
+
try {
Class<? extends Recommender> recommenderClass = Class.forName(recommenderClassName).asSubclass(
Recommender.class);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileDataModel.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileDataModel.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileDataModel.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileDataModel.java Thu Mar 31 09:25:25 2011
@@ -41,7 +41,7 @@ import org.apache.mahout.cf.taste.impl.m
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.model.Preference;
import org.apache.mahout.cf.taste.model.PreferenceArray;
-import org.apache.mahout.common.FileLineIterator;
+import org.apache.mahout.common.iterator.FileLineIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileIDMigrator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileIDMigrator.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileIDMigrator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/model/file/FileIDMigrator.java Thu Mar 31 09:25:25 2011
@@ -25,7 +25,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.mahout.cf.taste.common.Refreshable;
import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
import org.apache.mahout.cf.taste.impl.model.AbstractIDMigrator;
-import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.common.iterator.FileLineIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/slopeone/file/FileDiffStorage.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/slopeone/file/FileDiffStorage.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/slopeone/file/FileDiffStorage.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/slopeone/file/FileDiffStorage.java Thu Mar 31 09:25:25 2011
@@ -36,7 +36,7 @@ import org.apache.mahout.cf.taste.impl.c
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.model.PreferenceArray;
import org.apache.mahout.cf.taste.recommender.slopeone.DiffStorage;
-import org.apache.mahout.common.FileLineIterator;
+import org.apache.mahout.common.iterator.FileLineIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/similarity/file/FileItemItemSimilarityIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/similarity/file/FileItemItemSimilarityIterator.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/similarity/file/FileItemItemSimilarityIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/similarity/file/FileItemItemSimilarityIterator.java Thu Mar 31 09:25:25 2011
@@ -18,43 +18,32 @@
package org.apache.mahout.cf.taste.impl.similarity.file;
import org.apache.mahout.cf.taste.impl.similarity.GenericItemSimilarity;
-import org.apache.mahout.common.FileLineIterator;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.apache.mahout.common.iterator.TransformingIterator;
import java.io.File;
import java.io.IOException;
-import java.util.Iterator;
import java.util.regex.Pattern;
/**
- * a simple iterator using a {@link org.apache.mahout.common.FileLineIterator} internally, parsing each
+ * a simple iterator using a {@link FileLineIterator} internally, parsing each
* line into an {@link org.apache.mahout.cf.taste.impl.similarity.GenericItemSimilarity.ItemItemSimilarity}
*/
-final class FileItemItemSimilarityIterator implements Iterator<GenericItemSimilarity.ItemItemSimilarity> {
+final class FileItemItemSimilarityIterator
+ extends TransformingIterator<String,GenericItemSimilarity.ItemItemSimilarity> {
private static final Pattern SEPARATOR = Pattern.compile("[,\t]");
- private final FileLineIterator lineIterator;
-
FileItemItemSimilarityIterator(File similaritiesFile) throws IOException {
- lineIterator = new FileLineIterator(similaritiesFile);
- }
-
- @Override
- public boolean hasNext() {
- return lineIterator.hasNext();
+ super(new FileLineIterator(similaritiesFile));
}
@Override
- public GenericItemSimilarity.ItemItemSimilarity next() {
- String line = lineIterator.next();
- String[] tokens = SEPARATOR.split(line);
+ protected GenericItemSimilarity.ItemItemSimilarity transform(String in) {
+ String[] tokens = SEPARATOR.split(in);
return new GenericItemSimilarity.ItemItemSimilarity(Long.parseLong(tokens[0]),
Long.parseLong(tokens[1]),
Double.parseDouble(tokens[2]));
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TestClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TestClassifier.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TestClassifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TestClassifier.java Thu Mar 31 09:25:25 2011
@@ -44,7 +44,7 @@ import org.apache.mahout.classifier.baye
import org.apache.mahout.classifier.bayes.mapreduce.bayes.BayesClassifierDriver;
import org.apache.mahout.classifier.bayes.model.ClassifierContext;
import org.apache.mahout.common.CommandLineUtil;
-import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.common.iterator.FileLineIterable;
import org.apache.mahout.common.TimingStatistics;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.nlp.NGrams;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java Thu Mar 31 09:25:25 2011
@@ -17,12 +17,9 @@
package org.apache.mahout.classifier.bayes.datastore;
-import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.mahout.classifier.bayes.common.BayesParameters;
import org.apache.mahout.classifier.bayes.exceptions.InvalidDatastoreException;
import org.apache.mahout.classifier.bayes.interfaces.Datastore;
@@ -76,12 +73,7 @@ public class InMemoryBayesDatastore impl
@Override
public void initialize() throws InvalidDatastoreException {
Configuration conf = new Configuration();
- String basePath = params.getBasePath();
- try {
- SequenceFileModelReader.loadModel(this, FileSystem.get(new Path(basePath).toUri(), conf), params, conf);
- } catch (IOException e) {
- throw new InvalidDatastoreException(e);
- }
+ SequenceFileModelReader.loadModel(this, params, conf);
for (String label : getKeys("")) {
log.info("{} {} {} {}", new Object[] {
label,
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java Thu Mar 31 09:25:25 2011
@@ -17,20 +17,19 @@
package org.apache.mahout.classifier.bayes.io;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.mahout.classifier.bayes.datastore.InMemoryBayesDatastore;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
+import org.apache.mahout.common.Pair;
import org.apache.mahout.common.Parameters;
import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,245 +43,145 @@ public final class SequenceFileModelRead
private SequenceFileModelReader() { }
- public static void loadModel(InMemoryBayesDatastore datastore,
- FileSystem fs,
- Parameters params,
- Configuration conf) throws IOException {
-
- loadFeatureWeights(datastore, fs, new Path(params.get("sigma_j")), conf);
- loadLabelWeights(datastore, fs, new Path(params.get("sigma_k")), conf);
- loadSumWeight(datastore, fs, new Path(params.get("sigma_kSigma_j")), conf);
- loadThetaNormalizer(datastore, fs, new Path(params.get("thetaNormalizer")), conf);
- loadWeightMatrix(datastore, fs, new Path(params.get("weight")), conf);
-
+ public static void loadModel(InMemoryBayesDatastore datastore, Parameters params, Configuration conf) {
+ loadFeatureWeights(datastore, new Path(params.get("sigma_j")), conf);
+ loadLabelWeights(datastore, new Path(params.get("sigma_k")), conf);
+ loadSumWeight(datastore, new Path(params.get("sigma_kSigma_j")), conf);
+ loadThetaNormalizer(datastore, new Path(params.get("thetaNormalizer")), conf);
+ loadWeightMatrix(datastore, new Path(params.get("weight")), conf);
}
- public static void loadWeightMatrix(InMemoryBayesDatastore datastore,
- FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
-
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- log.info("{}", path);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
- // the key is label,feature
- while (reader.next(key, value)) {
-
- datastore.loadFeatureWeight(key.stringAt(2), key.stringAt(1), value.get());
-
- }
+ public static void loadWeightMatrix(InMemoryBayesDatastore datastore, Path pathPattern, Configuration conf) {
+ // the key is label,feature
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ datastore.loadFeatureWeight(key.stringAt(2), key.stringAt(1), value.get());
}
}
- public static void loadFeatureWeights(InMemoryBayesDatastore datastore,
- FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
-
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- log.info("{}", path);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
- // the key is either _label_ or label,feature
- long count = 0;
- while (reader.next(key, value)) {
- // Sum of weights for a Feature
- if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) {
- datastore.setSumFeatureWeight(key.stringAt(1), value.get());
- count++;
- if (count % 50000 == 0) {
- log.info("Read {} feature weights", count);
- }
+ public static void loadFeatureWeights(InMemoryBayesDatastore datastore, Path pathPattern, Configuration conf) {
+ // the key is either _label_ or label,feature
+ long count = 0;
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ // Sum of weights for a Feature
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) {
+ datastore.setSumFeatureWeight(key.stringAt(1), value.get());
+ if (++count % 50000 == 0) {
+ log.info("Read {} feature weights", count);
}
}
}
}
- public static void loadLabelWeights(InMemoryBayesDatastore datastore,
- FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
-
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- log.info("{}", path);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
- long count = 0;
- while (reader.next(key, value)) {
- // Sum of weights in a Label
- if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) {
- datastore.setSumLabelWeight(key.stringAt(1), value.get());
- count++;
- if (count % 10000 == 0) {
- log.info("Read {} label weights", count);
- }
+ public static void loadLabelWeights(InMemoryBayesDatastore datastore, Path pathPattern, Configuration conf) {
+ long count = 0;
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ // Sum of weights in a Label
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) {
+ datastore.setSumLabelWeight(key.stringAt(1), value.get());
+ if (++count % 10000 == 0) {
+ log.info("Read {} label weights", count);
}
}
}
}
- public static void loadThetaNormalizer(InMemoryBayesDatastore datastore,
- FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
-
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- log.info("{}", path);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
- long count = 0;
- while (reader.next(key, value)) {
- // Sum of weights in a Label
- if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
- datastore.setThetaNormalizer(key.stringAt(1), value.get());
- count++;
- if (count % 50000 == 0) {
- log.info("Read {} theta norms", count);
- }
+ public static void loadThetaNormalizer(InMemoryBayesDatastore datastore, Path pathPattern, Configuration conf) {
+ long count = 0;
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ // Sum of weights in a Label
+ if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
+ datastore.setThetaNormalizer(key.stringAt(1), value.get());
+ if (++count % 50000 == 0) {
+ log.info("Read {} theta norms", count);
}
}
}
}
- public static void loadSumWeight(InMemoryBayesDatastore datastore,
- FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
-
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- log.info("{}", path);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
- // the key is _label
- while (reader.next(key, value)) {
-
- if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) { // Sum of
- // weights for
- // all Features and all Labels
- datastore.setSigmaJSigmaK(value.get());
- log.info("{}", value.get());
- }
+ public static void loadSumWeight(InMemoryBayesDatastore datastore, Path pathPattern, Configuration conf) {
+ // the key is _label
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) {
+ // Sum of weights for all Features and all Labels
+ datastore.setSigmaJSigmaK(value.get());
+ log.info("{}", value.get());
}
}
}
- public static Map<String,Double> readLabelSums(FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
+ public static Map<String,Double> readLabelSums(Path pathPattern, Configuration conf) {
Map<String,Double> labelSum = new HashMap<String,Double>();
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
-
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- // the key is either _label_ or label,feature
- while (reader.next(key, value)) {
- if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) { // Sum of counts
- // of labels
- labelSum.put(key.stringAt(1), value.get());
- }
-
+ // the key is either _label_ or label,feature
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) {
+ // Sum of counts of labels
+ labelSum.put(key.stringAt(1), value.get());
}
}
-
return labelSum;
}
- public static Map<String,Double> readLabelDocumentCounts(FileSystem fs,
- Path pathPattern,
- Configuration conf) throws IOException {
+ public static Map<String,Double> readLabelDocumentCounts(Path pathPattern, Configuration conf) {
Map<String,Double> labelDocumentCounts = new HashMap<String,Double>();
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- // the key is either _label_ or label,feature
- while (reader.next(key, value)) {
- // Count of Documents in a Label
- if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) {
- labelDocumentCounts.put(key.stringAt(1), value.get());
- }
-
+ // the key is either _label_ or label,feature
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ // Count of Documents in a Label
+ if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) {
+ labelDocumentCounts.put(key.stringAt(1), value.get());
}
}
-
return labelDocumentCounts;
}
- public static double readSigmaJSigmaK(FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
+ public static double readSigmaJSigmaK(Path pathPattern, Configuration conf) {
Map<String,Double> weightSum = new HashMap<String,Double>();
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- while (reader.next(key, value)) {
- if (weightSum.size() > 1) {
- throw new IOException("Incorrect Sum File");
- } else if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) {
- weightSum.put(BayesConstants.TOTAL_SUM, value.get());
- }
-
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ if (weightSum.size() > 1) {
+ throw new IllegalStateException("Incorrect Sum File");
+ } else if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) {
+ weightSum.put(BayesConstants.TOTAL_SUM, value.get());
}
}
-
return weightSum.get(BayesConstants.TOTAL_SUM);
}
- public static double readVocabCount(FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
+ public static double readVocabCount(Path pathPattern, Configuration conf) {
Map<String,Double> weightSum = new HashMap<String,Double>();
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
-
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- while (reader.next(key, value)) {
- if (weightSum.size() > 1) {
- throw new IOException("Incorrect vocabCount File");
- }
- if (key.stringAt(0).equals(BayesConstants.FEATURE_SET_SIZE)) {
- weightSum.put(BayesConstants.FEATURE_SET_SIZE, value.get());
- }
-
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ if (weightSum.size() > 1) {
+ throw new IllegalStateException("Incorrect vocabCount File");
+ }
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ if (key.stringAt(0).equals(BayesConstants.FEATURE_SET_SIZE)) {
+ weightSum.put(BayesConstants.FEATURE_SET_SIZE, value.get());
}
}
-
return weightSum.get(BayesConstants.FEATURE_SET_SIZE);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java Thu Mar 31 09:25:25 2011
@@ -23,11 +23,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
@@ -36,8 +33,11 @@ import org.apache.hadoop.mapred.KeyValue
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.mahout.classifier.ConfusionMatrix;
import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
import org.apache.mahout.common.Parameters;
import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,46 +74,35 @@ public final class BayesClassifierDriver
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
- HadoopUtil.overwriteOutput(outPath);
+ HadoopUtil.delete(conf, outPath);
conf.set("bayes.parameters", params.toString());
client.setConf(conf);
JobClient.runJob(conf);
Path outputFiles = new Path(outPath, "part*");
- FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
- ConfusionMatrix matrix = readResult(dfs, outputFiles, conf, params);
+ ConfusionMatrix matrix = readResult(outputFiles, conf, params);
log.info("{}", matrix.summarize());
}
- public static ConfusionMatrix readResult(FileSystem fs,
- Path pathPattern,
- Configuration conf,
- Parameters params) throws IOException {
-
- StringTuple key = new StringTuple();
- DoubleWritable value = new DoubleWritable();
+ public static ConfusionMatrix readResult(Path pathPattern, Configuration conf, Parameters params) {
String defaultLabel = params.get("defaultCat");
- FileStatus[] outputFiles = fs.globStatus(pathPattern);
Map<String,Map<String,Integer>> confusionMatrix = new HashMap<String,Map<String,Integer>>();
-
- for (FileStatus fileStatus : outputFiles) {
- Path path = fileStatus.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- while (reader.next(key, value)) {
- String correctLabel = key.stringAt(1);
- String classifiedLabel = key.stringAt(2);
- Map<String,Integer> rowMatrix = confusionMatrix.get(correctLabel);
- if (rowMatrix == null) {
- rowMatrix = new HashMap<String,Integer>();
- }
- Integer count = Double.valueOf(value.get()).intValue();
- rowMatrix.put(classifiedLabel, count);
- confusionMatrix.put(correctLabel, rowMatrix);
-
+ for (Pair<StringTuple,DoubleWritable> record :
+ new SequenceFileDirIterable<StringTuple,DoubleWritable>(pathPattern, PathType.GLOB, null, null, true, conf)) {
+ StringTuple key = record.getFirst();
+ DoubleWritable value = record.getSecond();
+ String correctLabel = key.stringAt(1);
+ String classifiedLabel = key.stringAt(2);
+ Map<String,Integer> rowMatrix = confusionMatrix.get(correctLabel);
+ if (rowMatrix == null) {
+ rowMatrix = new HashMap<String,Integer>();
}
+ Integer count = Double.valueOf(value.get()).intValue();
+ rowMatrix.put(classifiedLabel, count);
+ confusionMatrix.put(correctLabel, rowMatrix);
}
-
+
ConfusionMatrix matrix = new ConfusionMatrix(confusionMatrix.keySet(), defaultLabel);
for (Map.Entry<String,Map<String,Integer>> correctLabelSet : confusionMatrix.entrySet()) {
Map<String,Integer> rowMatrix = correctLabelSet.getValue();
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java Thu Mar 31 09:25:25 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.classifier.bay
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.classifier.bayes.common.BayesParameters;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureDriver;
@@ -36,7 +37,9 @@ public class BayesDriver implements Baye
@Override
public void runJob(Path input, Path output, BayesParameters params) throws IOException {
- HadoopUtil.overwriteOutput(output);
+
+ Configuration conf = new Configuration();
+ HadoopUtil.delete(conf, output);
log.info("Reading features...");
// Read the features in each document normalized by length of each document
@@ -64,22 +67,22 @@ public class BayesDriver implements Baye
}
Path docCountOutPath = new Path(output, "trainer-docCount");
- HadoopUtil.overwriteOutput(docCountOutPath);
+ HadoopUtil.delete(conf, docCountOutPath);
Path termDocCountOutPath = new Path(output, "trainer-termDocCount");
- HadoopUtil.overwriteOutput(termDocCountOutPath);
+ HadoopUtil.delete(conf, termDocCountOutPath);
Path featureCountOutPath = new Path(output, "trainer-featureCount");
- HadoopUtil.overwriteOutput(featureCountOutPath);
+ HadoopUtil.delete(conf, featureCountOutPath);
Path wordFreqOutPath = new Path(output, "trainer-wordFreq");
- HadoopUtil.overwriteOutput(wordFreqOutPath);
+ HadoopUtil.delete(conf, wordFreqOutPath);
Path vocabCountPath = new Path(output, "trainer-tfIdf/trainer-vocabCount");
- HadoopUtil.overwriteOutput(vocabCountPath);
+ HadoopUtil.delete(conf, vocabCountPath);
Path vocabCountOutPath = new Path(output, "trainer-vocabCount");
- HadoopUtil.overwriteOutput(vocabCountOutPath);
+ HadoopUtil.delete(conf, vocabCountOutPath);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java Thu Mar 31 09:25:25 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.DoubleWritable;
@@ -69,11 +68,10 @@ public class BayesThetaNormalizerDriver
// Dont ever forget this. People should keep track of how hadoop conf
// parameters and make or break a piece of code
- HadoopUtil.overwriteOutput(outPath);
- FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
-
+ HadoopUtil.delete(conf, outPath);
+
Path sigmaKFiles = new Path(output, "trainer-weights/Sigma_k/*");
- Map<String,Double> labelWeightSum = SequenceFileModelReader.readLabelSums(dfs, sigmaKFiles, conf);
+ Map<String,Double> labelWeightSum = SequenceFileModelReader.readLabelSums(sigmaKFiles, conf);
DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(conf,
GenericsUtil.getClass(labelWeightSum));
String labelWeightSumString = mapStringifier.toString(labelWeightSum);
@@ -84,7 +82,7 @@ public class BayesThetaNormalizerDriver
conf.set("cnaivebayes.sigma_k", labelWeightSumString);
Path sigmaJSigmaKFile = new Path(output, "trainer-weights/Sigma_kSigma_j/*");
- double sigmaJSigmaK = SequenceFileModelReader.readSigmaJSigmaK(dfs, sigmaJSigmaKFile, conf);
+ double sigmaJSigmaK = SequenceFileModelReader.readSigmaJSigmaK(sigmaJSigmaKFile, conf);
DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(conf, Double.class);
String sigmaJSigmaKString = stringifier.toString(sigmaJSigmaK);
@@ -94,7 +92,7 @@ public class BayesThetaNormalizerDriver
conf.set("cnaivebayes.sigma_jSigma_k", sigmaJSigmaKString);
Path vocabCountFile = new Path(output, "trainer-tfIdf/trainer-vocabCount/*");
- double vocabCount = SequenceFileModelReader.readVocabCount(dfs, vocabCountFile, conf);
+ double vocabCount = SequenceFileModelReader.readVocabCount(vocabCountFile, conf);
String vocabCountString = stringifier.toString(vocabCount);
log.info("Vocabulary Count");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java Thu Mar 31 09:25:25 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.classifier.bay
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.classifier.bayes.common.BayesParameters;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureDriver;
@@ -36,7 +37,8 @@ public class CBayesDriver implements Bay
@Override
public void runJob(Path input, Path output, BayesParameters params) throws IOException {
- HadoopUtil.overwriteOutput(output);
+ Configuration conf = new Configuration();
+ HadoopUtil.delete(conf, output);
log.info("Reading features...");
// Read the features in each document normalized by length of each document
@@ -64,22 +66,22 @@ public class CBayesDriver implements Bay
}
Path docCountOutPath = new Path(output, "trainer-docCount");
- HadoopUtil.overwriteOutput(docCountOutPath);
+ HadoopUtil.delete(conf, docCountOutPath);
Path termDocCountOutPath = new Path(output, "trainer-termDocCount");
- HadoopUtil.overwriteOutput(termDocCountOutPath);
+ HadoopUtil.delete(conf, termDocCountOutPath);
Path featureCountOutPath = new Path(output, "trainer-featureCount");
- HadoopUtil.overwriteOutput(featureCountOutPath);
+ HadoopUtil.delete(conf, featureCountOutPath);
Path wordFreqOutPath = new Path(output, "trainer-wordFreq");
- HadoopUtil.overwriteOutput(wordFreqOutPath);
+ HadoopUtil.delete(conf, wordFreqOutPath);
Path vocabCountPath = new Path(output, "trainer-tfIdf/trainer-vocabCount");
- HadoopUtil.overwriteOutput(vocabCountPath);
+ HadoopUtil.delete(conf, vocabCountPath);
Path vocabCountOutPath = new Path(output, "trainer-vocabCount");
- HadoopUtil.overwriteOutput(vocabCountOutPath);
+ HadoopUtil.delete(conf, vocabCountOutPath);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java Thu Mar 31 09:25:25 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.DoubleWritable;
@@ -64,17 +63,16 @@ public class CBayesThetaNormalizerDriver
conf.setCombinerClass(CBayesThetaNormalizerReducer.class);
conf.setReducerClass(CBayesThetaNormalizerReducer.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf
- .set("io.serializations",
- "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
// Dont ever forget this. People should keep track of how hadoop conf
// parameters and make or break a piece of code
- FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
- HadoopUtil.overwriteOutput(outPath);
+ HadoopUtil.delete(conf, outPath);
Path sigmaKFiles = new Path(output, "trainer-weights/Sigma_k/*");
- Map<String,Double> labelWeightSum = SequenceFileModelReader.readLabelSums(dfs, sigmaKFiles, conf);
+ Map<String,Double> labelWeightSum = SequenceFileModelReader.readLabelSums(sigmaKFiles, conf);
DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(conf,
GenericsUtil.getClass(labelWeightSum));
String labelWeightSumString = mapStringifier.toString(labelWeightSum);
@@ -85,7 +83,7 @@ public class CBayesThetaNormalizerDriver
conf.set("cnaivebayes.sigma_k", labelWeightSumString);
Path sigmaKSigmaJFile = new Path(output, "trainer-weights/Sigma_kSigma_j/*");
- double sigmaJSigmaK = SequenceFileModelReader.readSigmaJSigmaK(dfs, sigmaKSigmaJFile, conf);
+ double sigmaJSigmaK = SequenceFileModelReader.readSigmaJSigmaK(sigmaKSigmaJFile, conf);
DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(conf, Double.class);
String sigmaJSigmaKString = stringifier.toString(sigmaJSigmaK);
@@ -95,7 +93,7 @@ public class CBayesThetaNormalizerDriver
conf.set("cnaivebayes.sigma_jSigma_k", sigmaJSigmaKString);
Path vocabCountFile = new Path(output, "trainer-tfIdf/trainer-vocabCount/*");
- double vocabCount = SequenceFileModelReader.readVocabCount(dfs, vocabCountFile, conf);
+ double vocabCount = SequenceFileModelReader.readVocabCount(vocabCountFile, conf);
String vocabCountString = stringifier.toString(vocabCount);
log.info("Vocabulary Count");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java Thu Mar 31 09:25:25 2011
@@ -56,7 +56,7 @@ public class BayesFeatureDriver implemen
"org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
// this conf parameter needs to be set enable serialisation of conf values
- HadoopUtil.overwriteOutput(output);
+ HadoopUtil.delete(conf, output);
conf.set("bayes.parameters", params.toString());
client.setConf(conf);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java Thu Mar 31 09:25:25 2011
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -71,18 +70,15 @@ public class BayesTfIdfDriver implements
conf.setOutputFormat(BayesTfIdfOutputFormat.class);
- conf
- .set("io.serializations",
- "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
// Dont ever forget this. People should keep track of how hadoop conf
// parameters and make or break a piece of code
-
- FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
- HadoopUtil.overwriteOutput(outPath);
-
+ HadoopUtil.delete(conf, outPath);
Path interimFile = new Path(output, "trainer-docCount/part-*");
- Map<String,Double> labelDocumentCounts = SequenceFileModelReader.readLabelDocumentCounts(dfs, interimFile, conf);
+ Map<String,Double> labelDocumentCounts = SequenceFileModelReader.readLabelDocumentCounts(interimFile, conf);
DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(conf,
GenericsUtil.getClass(labelDocumentCounts));
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerDriver.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerDriver.java Thu Mar 31 09:25:25 2011
@@ -46,7 +46,7 @@ public class BayesWeightSummerDriver imp
FileInputFormat.addInputPath(conf, new Path(output, "trainer-tfIdf/trainer-tfIdf"));
Path outPath = new Path(output, "trainer-weights");
FileOutputFormat.setOutputPath(conf, outPath);
- HadoopUtil.overwriteOutput(outPath);
+ HadoopUtil.delete(conf, outPath);
// conf.setNumReduceTasks(1);
// conf.setNumMapTasks(100);
conf.setMapperClass(BayesWeightSummerMapper.class);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/NaiveBayesModel.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/NaiveBayesModel.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/NaiveBayesModel.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/NaiveBayesModel.java Thu Mar 31 09:25:25 2011
@@ -17,17 +17,15 @@
package org.apache.mahout.classifier.naivebayes;
-import java.io.IOException;
import java.lang.reflect.Type;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.mahout.classifier.naivebayes.trainer.NaiveBayesTrainer;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.JsonMatrixAdapter;
import org.apache.mahout.math.JsonVectorAdapter;
import org.apache.mahout.math.Matrix;
@@ -143,23 +141,21 @@ public class NaiveBayesModel implements
}
// CODE USED FOR SERIALIZATION
- public static NaiveBayesModel fromMRTrainerOutput(Path output, Configuration conf) throws IOException {
+ public static NaiveBayesModel fromMRTrainerOutput(Path output, Configuration conf) {
Path classVectorPath = new Path(output, NaiveBayesTrainer.CLASS_VECTORS);
Path sumVectorPath = new Path(output, NaiveBayesTrainer.SUM_VECTORS);
Path thetaSumPath = new Path(output, NaiveBayesTrainer.THETA_SUM);
NaiveBayesModel model = new NaiveBayesModel();
model.setAlphaI(conf.getFloat(NaiveBayesTrainer.ALPHA_I, 1.0f));
-
- FileSystem fs = sumVectorPath.getFileSystem(conf);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, sumVectorPath, conf);
- Writable key = new Text();
- VectorWritable value = new VectorWritable();
int featureCount = 0;
int labelCount = 0;
// read feature sums and label sums
- while (reader.next(key, value)) {
+ for (Pair<Text,VectorWritable> record :
+ new SequenceFileIterable<Text, VectorWritable>(sumVectorPath, true, conf)) {
+ Text key = record.getFirst();
+ VectorWritable value = record.getSecond();
if (key.toString().equals(BayesConstants.FEATURE_SUM)) {
model.setFeatureSum(value.get());
featureCount = value.get().getNumNondefaultElements();
@@ -170,30 +166,28 @@ public class NaiveBayesModel implements
labelCount = value.get().size();
}
}
- reader.close();
-
+
// read the class matrix
- reader = new SequenceFile.Reader(fs, classVectorPath, conf);
- IntWritable label = new IntWritable();
Matrix matrix = new SparseMatrix(new int[] {labelCount, featureCount});
- while (reader.next(label, value)) {
+ for (Pair<IntWritable,VectorWritable> record :
+ new SequenceFileIterable<IntWritable,VectorWritable>(classVectorPath, true, conf)) {
+ IntWritable label = record.getFirst();
+ VectorWritable value = record.getSecond();
matrix.assignRow(label.get(), value.get());
}
- reader.close();
model.setWeightMatrix(matrix);
-
-
-
- reader = new SequenceFile.Reader(fs, thetaSumPath, conf);
+
// read theta normalizer
- while (reader.next(key, value)) {
+ for (Pair<Text,VectorWritable> record :
+ new SequenceFileIterable<Text,VectorWritable>(thetaSumPath, true, conf)) {
+ Text key = record.getFirst();
+ VectorWritable value = record.getSecond();
if (key.toString().equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
model.setPerlabelThetaNormalizer(value.get());
}
}
- reader.close();
-
+
return model;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesInstanceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesInstanceMapper.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesInstanceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesInstanceMapper.java Thu Mar 31 09:25:25 2011
@@ -22,13 +22,13 @@ import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenObjectIntHashMap;
@@ -51,23 +51,15 @@ public class NaiveBayesInstanceMapper ex
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
- try {
- URI[] localFiles = DistributedCache.getCacheFiles(conf);
- if (localFiles == null || localFiles.length < 1) {
- throw new IllegalArgumentException("missing paths from the DistributedCache");
- }
- Path labelMapFile = new Path(localFiles[0].getPath());
- FileSystem fs = labelMapFile.getFileSystem(conf);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, labelMapFile, conf);
- Writable key = new Text();
- IntWritable value = new IntWritable();
-
- // key is word value is id
- while (reader.next(key, value)) {
- labelMap.put(key.toString(), value.get());
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ if (localFiles == null || localFiles.length < 1) {
+ throw new IllegalArgumentException("missing paths from the DistributedCache");
+ }
+ Path labelMapFile = new Path(localFiles[0].getPath());
+ // key is word value is id
+ for (Pair<Writable,IntWritable> record :
+ new SequenceFileIterable<Writable,IntWritable>(labelMapFile, true, conf)) {
+ labelMap.put(record.getFirst().toString(), record.getSecond().get());
}
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesThetaComplementaryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesThetaComplementaryMapper.java?rev=1087225&r1=1087224&r2=1087225&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesThetaComplementaryMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/trainer/NaiveBayesThetaComplementaryMapper.java Thu Mar 31 09:25:25 2011
@@ -23,14 +23,14 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.classifier.naivebayes.BayesConstants;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
@@ -66,42 +66,31 @@ public class NaiveBayesThetaComplementar
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
- try {
- URI[] localFiles = DistributedCache.getCacheFiles(conf);
- if (localFiles == null || localFiles.length < 2) {
- throw new IllegalArgumentException("missing paths from the DistributedCache");
- }
- alphaI = conf.getFloat(NaiveBayesTrainer.ALPHA_I, 1.0f);
- Path weightFile = new Path(localFiles[0].getPath());
- FileSystem fs = weightFile.getFileSystem(conf);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, weightFile, conf);
- Writable key = new Text();
- VectorWritable value = new VectorWritable();
-
- while (reader.next(key, value)) {
- if (key.toString().equals(BayesConstants.FEATURE_SUM)) {
- featureSum = value.get();
- } else if (key.toString().equals(BayesConstants.LABEL_SUM)) {
- labelSum = value.get();
- }
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ if (localFiles == null || localFiles.length < 2) {
+ throw new IllegalArgumentException("missing paths from the DistributedCache");
+ }
+ alphaI = conf.getFloat(NaiveBayesTrainer.ALPHA_I, 1.0f);
+ Path weightFile = new Path(localFiles[0].getPath());
+ for (Pair<Text,VectorWritable> record :
+ new SequenceFileIterable<Text,VectorWritable>(weightFile, true, conf)) {
+ Text key = record.getFirst();
+ VectorWritable value = record.getSecond();
+ if (key.toString().equals(BayesConstants.FEATURE_SUM)) {
+ featureSum = value.get();
+ } else if (key.toString().equals(BayesConstants.LABEL_SUM)) {
+ labelSum = value.get();
}
- perLabelThetaNormalizer = labelSum.like();
- totalSum = labelSum.zSum();
- vocabCount = featureSum.getNumNondefaultElements();
-
- Path labelMapFile = new Path(localFiles[1].getPath());
- fs = labelMapFile.getFileSystem(conf);
-
- reader.close();
- reader = new SequenceFile.Reader(fs, labelMapFile, conf);
- IntWritable intValue = new IntWritable();
+ }
+ perLabelThetaNormalizer = labelSum.like();
+ totalSum = labelSum.zSum();
+ vocabCount = featureSum.getNumNondefaultElements();
- // key is word value is id
- while (reader.next(key, intValue)) {
- labelMap.put(key.toString(), intValue.get());
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
+ Path labelMapFile = new Path(localFiles[1].getPath());
+ // key is word value is id
+ for (Pair<Writable,IntWritable> record :
+ new SequenceFileIterable<Writable,IntWritable>(labelMapFile, true, conf)) {
+ labelMap.put(record.getFirst().toString(), record.getSecond().get());
}
}