You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/09 23:43:56 UTC
svn commit: r1491301 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/
core/src/main/java/org/apache/mahout/classifier/df/mapreduce/
core/src/main/java/org/apache/mahout/classifier/naivebayes/
core/src/main/java/org/apache/m...
Author: ssc
Date: Sun Jun 9 21:43:55 2013
New Revision: 1491301
URL: http://svn.apache.org/r1491301
Log:
MAHOUT-992 Audit DistributedCache use to support EMR
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java Sun Jun 9 21:43:55 2013
@@ -21,12 +21,12 @@ import com.google.common.base.Preconditi
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -71,23 +71,18 @@ final class ALS {
IntWritable rowIndex = new IntWritable();
VectorWritable row = new VectorWritable();
- LocalFileSystem localFs = FileSystem.getLocal(conf);
- Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
OpenIntObjectHashMap<Vector> featureMatrix = numEntities > 0
? new OpenIntObjectHashMap<Vector>(numEntities) : new OpenIntObjectHashMap<Vector>();
- for (int n = 0; n < cacheFiles.length; n++) {
- Path localCacheFile = localFs.makeQualified(cacheFiles[n]);
+ Path[] cachedFiles = HadoopUtil.getCachedFiles(conf);
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
- // fallback for local execution
- if (!localFs.exists(localCacheFile)) {//MAHOUT-992: this seems safe
- localCacheFile = new Path(DistributedCache.getCacheFiles(conf)[n].getPath());
- }
+ for (int n = 0; n < cachedFiles.length; n++) {
SequenceFile.Reader reader = null;
try {
- reader = new SequenceFile.Reader(localFs, localCacheFile, conf);
+ reader = new SequenceFile.Reader(localFs, cachedFiles[n], conf);
while (reader.next(rowIndex, row)) {
featureMatrix.put(rowIndex.get(), row.get());
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java Sun Jun 9 21:43:55 2013
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URI;
import java.util.Arrays;
import java.util.Comparator;
@@ -200,9 +199,9 @@ public abstract class Builder {
* if no path is found
*/
public static Path getDistributedCacheFile(Configuration conf, int index) throws IOException {
- Path[] files = DistributedCache.getLocalCacheFiles(conf);
+ Path[] files = HadoopUtil.getCachedFiles(conf);
- if (files == null || files.length <= index) {
+ if (files.length <= index) {
throw new IOException("path not found in the DistributedCache");
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java Sun Jun 9 21:43:55 2013
@@ -199,9 +199,9 @@ public class Classifier {
Configuration conf = context.getConfiguration();
- Path[] files = DistributedCache.getLocalCacheFiles(conf);
+ Path[] files = HadoopUtil.getCachedFiles(conf);
- if (files == null || files.length < 2) {
+ if (files.length < 2) {
throw new IOException("not enough paths in the DistributedCache");
}
LocalFileSystem localFs = FileSystem.getLocal(conf);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java Sun Jun 9 21:43:55 2013
@@ -143,7 +143,7 @@ public final class BayesUtils {
public static OpenObjectIntHashMap<String> readIndexFromCache(Configuration conf) throws IOException {
OpenObjectIntHashMap<String> index = new OpenObjectIntHashMap<String>();
for (Pair<Writable,IntWritable> entry
- : new SequenceFileIterable<Writable,IntWritable>(HadoopUtil.cachedFile(conf), conf)) {
+ : new SequenceFileIterable<Writable,IntWritable>(HadoopUtil.getSingleCachedFile(conf), conf)) {
index.put(entry.getFirst().toString(), entry.getSecond().get());
}
return index;
@@ -152,7 +152,7 @@ public final class BayesUtils {
public static Map<String,Vector> readScoresFromCache(Configuration conf) throws IOException {
Map<String,Vector> sumVectors = Maps.newHashMap();
for (Pair<Text,VectorWritable> entry
- : new SequenceFileDirIterable<Text,VectorWritable>(HadoopUtil.cachedFile(conf),
+ : new SequenceFileDirIterable<Text,VectorWritable>(HadoopUtil.getSingleCachedFile(conf),
PathType.LIST, PathFilters.partFilter(), conf)) {
sumVectors.put(entry.getFirst().toString(), entry.getSecond().get());
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java Sun Jun 9 21:43:55 2013
@@ -47,7 +47,7 @@ public class BayesTestMapper extends Map
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
- Path modelPath = HadoopUtil.cachedFile(conf);
+ Path modelPath = HadoopUtil.getSingleCachedFile(conf);
NaiveBayesModel model = NaiveBayesModel.materialize(modelPath, conf);
boolean compl = Boolean.parseBoolean(conf.get(TestNaiveBayesDriver.COMPLEMENTARY));
if (compl) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java Sun Jun 9 21:43:55 2013
@@ -25,7 +25,6 @@ import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -96,37 +95,15 @@ public final class VectorCache {
* Loads the vector from {@link DistributedCache}. Returns null if no vector exists.
*/
public static Vector load(Configuration conf) throws IOException {
- Path[] files = DistributedCache.getLocalCacheFiles(conf);
- LocalFileSystem localFs = FileSystem.getLocal(conf);
- if (files == null || files.length < 1) {
- log.debug("getLocalCacheFiles failed, trying getCacheFiles");
- URI[] filesURIs = DistributedCache.getCacheFiles(conf);
- if (filesURIs == null) {
- throw new IOException("Cannot read Frequency list from Distributed Cache");
- }
- if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
- }
- files = new Path[1];
- files[0] = new Path(filesURIs[0].getPath());
- } else {
- // Fallback if we are running locally.
- if (!localFs.exists(files[0])) {
- URI[] filesURIs = DistributedCache.getCacheFiles(conf);
- if (filesURIs == null) {
- throw new IOException("Cannot read Frequency list from Distributed Cache");
- }
- if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
- }
- files[0] = new Path(filesURIs[0].getPath());
- }
+ Path[] files = HadoopUtil.getCachedFiles(conf);
+
+ if (files.length != 1) {
+ throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
}
if (log.isInfoEnabled()) {
log.info("Files are: {}", Arrays.toString(files));
}
- files[0] = localFs.makeQualified(files[0]);
return load(conf, files[0]);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Sun Jun 9 21:43:55 2013
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
+import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
@@ -272,9 +274,46 @@ public final class HadoopUtil {
* @return
* @throws IOException
*/
- public static Path cachedFile(Configuration conf) throws IOException {
+ public static Path getSingleCachedFile(Configuration conf) throws IOException {
+ return getCachedFiles(conf)[0];
+ }
+
+ /**
+ * Retrieves paths to cached files.
+ * @param conf
+ * @return
+ * @throws IOException
+ * @throws IllegalStateException if no cache files are found
+ */
+ public static Path[] getCachedFiles(Configuration conf) throws IOException {
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
- return cacheFiles != null && cacheFiles.length > 0 ? cacheFiles[0] : null;
+
+ URI[] fallbackFiles = DistributedCache.getCacheFiles(conf);
+
+ // fallback for local execution
+ if (cacheFiles == null) {
+
+ Preconditions.checkState(fallbackFiles != null, "Unable to find cached files!");
+
+ cacheFiles = new Path[fallbackFiles.length];
+ for (int n = 0; n < fallbackFiles.length; n++) {
+ cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+ }
+ } else {
+
+ for (int n = 0; n < cacheFiles.length; n++) {
+ cacheFiles[n] = localFs.makeQualified(cacheFiles[n]);
+ // fallback for local execution
+ if (!localFs.exists(cacheFiles[n])) {
+ cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+ }
+ }
+ }
+
+ Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached files!");
+
+ return cacheFiles;
}
public static void setSerializations(Configuration conf) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Jun 9 21:43:55 2013
@@ -18,7 +18,6 @@
package org.apache.mahout.fpm.pfpgrowth;
import java.io.IOException;
-import java.net.URI;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
@@ -86,29 +85,14 @@ public final class PFPGrowth {
*/
public static List<Pair<String,Long>> readFList(Configuration conf) throws IOException {
List<Pair<String,Long>> list = Lists.newArrayList();
- Path[] files = DistributedCache.getLocalCacheFiles(conf);
- if (files == null) {
- throw new IOException("Cannot read Frequency list from Distributed Cache");
- }
+
+ Path[] files = HadoopUtil.getCachedFiles(conf);
if (files.length != 1) {
throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
}
- FileSystem fs = FileSystem.getLocal(conf);
- Path fListLocalPath = fs.makeQualified(files[0]);
- // Fallback if we are running locally.
- if (!fs.exists(fListLocalPath)) {
- URI[] filesURIs = DistributedCache.getCacheFiles(conf);
- if (filesURIs == null) {
- throw new IOException("Cannot read Frequency list from Distributed Cache");
- }
- if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
- }
- fListLocalPath = new Path(filesURIs[0].getPath());
- }
- fListLocalPath = fs.makeQualified(fListLocalPath);
+
for (Pair<Text,LongWritable> record
- : new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true, conf)) {
+ : new SequenceFileIterable<Text,LongWritable>(files[0], true, conf)) {
list.add(new Pair<String,Long>(record.getFirst().toString(), record.getSecond().get()));
}
return list;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java Sun Jun 9 21:43:55 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.RandomAccessSparseVector;
@@ -214,13 +215,8 @@ public final class TimesSquaredJob {
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
"missing paths from the DistributedCache");
- FileSystem fs = FileSystem.getLocal(conf);
- Path inputVectorPath;
- if (fs.exists(localFiles[0])) {
- inputVectorPath = fs.makeQualified(localFiles[0]);
- } else {//MAHOUT-992: this seems safe
- inputVectorPath = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
- }
+
+ Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf);
SequenceFileValueIterator<VectorWritable> iterator =
new SequenceFileValueIterator<VectorWritable>(inputVectorPath, true, conf);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java Sun Jun 9 21:43:55 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.IOUtils;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -322,7 +323,7 @@ public final class ABtDenseOutJob {
blockHeight = conf.getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
distributedBt = conf.get(PROP_BT_BROADCAST) != null;
if (distributedBt) {
- btLocalPath = DistributedCache.getLocalCacheFiles(conf);
+ btLocalPath = HadoopUtil.getCachedFiles(conf);
localFsConfig = new Configuration();
localFsConfig.set("fs.default.name", "file:///");
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java Sun Jun 9 21:43:55 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.IOUtils;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -211,8 +212,7 @@ public final class ABtJob {
if (distributedBt) {
- Path[] btFiles =
- DistributedCache.getLocalCacheFiles(context.getConfiguration());
+ Path[] btFiles = HadoopUtil.getCachedFiles(context.getConfiguration());
// DEBUG: stdout
//System.out.printf("list of files: " + btFiles);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Sun Jun 9 21:43:55 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.IOUtils;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
@@ -207,7 +208,7 @@ public final class BtJob {
boolean distributedRHat = conf.get(PROP_RHAT_BROADCAST) != null;
if (distributedRHat) {
- Path[] rFiles = DistributedCache.getLocalCacheFiles(conf);
+ Path[] rFiles = HadoopUtil.getCachedFiles(conf);
Validate.notNull(rFiles,
"no RHat files in distributed cache job definition");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java Sun Jun 9 21:43:55 2013
@@ -16,15 +16,13 @@ package org.apache.mahout.vectorizer.pru
* limitations under the License.
*/
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.Vector;
@@ -72,19 +70,13 @@ public class WordsPrunerReducer extends
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
- Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
- Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
- "missing paths from the DistributedCache");
+ Path[] localFiles = HadoopUtil.getCachedFiles(conf);
maxDf = conf.getLong(HighDFWordsPruner.MAX_DF, Long.MAX_VALUE);
minDf = conf.getLong(HighDFWordsPruner.MIN_DF, -1);
- FileSystem fs = FileSystem.getLocal(conf);
- Path dictionaryFile;
- if (fs.exists(localFiles[0])) {
- dictionaryFile = fs.makeQualified(localFiles[0]);
- } else {//MAHOUT-992: this seems safe
- dictionaryFile = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
- }
+
+ Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
+
// key is feature, value is the document frequency
for (Pair<IntWritable, LongWritable> record
: new SequenceFileIterable<IntWritable, LongWritable>(dictionaryFile, true, conf)) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java Sun Jun 9 21:43:55 2013
@@ -17,12 +17,8 @@
package org.apache.mahout.vectorizer.term;
-import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -30,6 +26,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.StringTuple;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
@@ -46,7 +43,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URI;
import java.util.Arrays;
import java.util.Iterator;
@@ -121,34 +117,17 @@ public class TFPartialVectorReducer exte
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
- Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
- Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
- "missing paths from the DistributedCache");
- LocalFileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.exists(localFiles[0])) {
- log.info("Can't find dictionary dist. cache file, looking in .getCacheFiles");
- URI[] filesURIs = DistributedCache.getCacheFiles(conf);
- if (filesURIs == null) {
- throw new IOException("Cannot read Frequency list from Distributed Cache");
- }
- if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
- }
- localFiles[0] = new Path(filesURIs[0].getPath());
- }
dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
maxNGramSize = conf.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
- if (log.isInfoEnabled()) {
- log.info("Cache Files: " + Arrays.asList(localFiles));
- }
+
//MAHOUT-1247
- localFiles[0] = localFs.makeQualified(localFiles[0]);
+ Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
// key is word value is id
for (Pair<Writable, IntWritable> record
- : new SequenceFileIterable<Writable, IntWritable>(localFiles[0], true, conf)) {
+ : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
dictionary.put(record.getFirst().toString(), record.getSecond().get());
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun 9 21:43:55 2013
@@ -18,19 +18,15 @@
package org.apache.mahout.vectorizer.tfidf;
import java.io.IOException;
-import java.net.URI;
import java.util.Iterator;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.NamedVector;
@@ -102,20 +98,7 @@ public class TFIDFPartialVectorReducer e
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
- Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
- Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
- "missing paths from the DistributedCache");
- LocalFileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.exists(localFiles[0])) {
- URI[] filesURIs = DistributedCache.getCacheFiles(conf);
- if (filesURIs == null) {
- throw new IOException("Cannot read Frequency list from Distributed Cache");
- }
- if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
- }
- localFiles[0] = new Path(filesURIs[0].getPath());
- }
+
vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);
@@ -123,7 +106,7 @@ public class TFIDFPartialVectorReducer e
sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
- Path dictionaryFile = localFs.makeQualified(localFiles[0]);
+ Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
// key is feature, value is the document frequency
for (Pair<IntWritable,LongWritable> record
: new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile, true, conf)) {
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java Sun Jun 9 21:43:55 2013
@@ -28,7 +28,6 @@ import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.classifier.df.data.Dataset.Attribute;
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java Sun Jun 9 21:43:55 2013
@@ -20,7 +20,6 @@ package org.apache.mahout.clustering.str
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java Sun Jun 9 21:43:55 2013
@@ -17,19 +17,17 @@
package org.apache.mahout.cf.taste.example.email;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.map.OpenObjectIntHashMap;
import java.io.IOException;
-import java.net.URI;
import java.util.regex.Pattern;
public final class EmailUtility {
@@ -64,9 +62,7 @@ public final class EmailUtility {
String msgIdPrefix,
OpenObjectIntHashMap<String> msgIdDictionary) throws IOException {
- Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
- Preconditions.checkArgument(localFiles != null,
- "missing paths from the DistributedCache");
+ Path[] localFiles = HadoopUtil.getCachedFiles(conf);
FileSystem fs = FileSystem.getLocal(conf);
for (Path dictionaryFile : localFiles) {
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java Sun Jun 9 21:43:55 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.lucene.search.TermQuery;
import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
@@ -34,7 +33,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;