You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2013/06/09 22:51:19 UTC
svn commit: r1491294 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/classifier/df/mapreduce/
core/src/main/java/org/apache/mahout/clustering/spectral/common/
core/src/main/java/org/apache/mahout/fpm/pfpgrowth/
core/src/main/java/org/apache/ma...
Author: gsingers
Date: Sun Jun 9 20:51:18 2013
New Revision: 1491294
URL: http://svn.apache.org/r1491294
Log:
MAHOUT-992: more cleanup on DistCache usage
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java Sun Jun 9 20:51:18 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
@@ -203,11 +204,17 @@ public class Classifier {
if (files == null || files.length < 2) {
throw new IOException("not enough paths in the DistributedCache");
}
-
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.exists(files[0])) {//MAHOUT-992: this seems safe
+ files[0] = localFs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
+ }
+
dataset = Dataset.load(conf, files[0]);
converter = new DataConverter(dataset);
-
+ if (!localFs.exists(files[1])) {//MAHOUT-992: this seems safe
+ files[1] = localFs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[1].getPath()));
+ }
forest = DecisionForest.load(conf, files[1]);
if (forest == null) {
throw new InterruptedException("DecisionForest not found!");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java Sun Jun 9 20:51:18 2013
@@ -97,6 +97,7 @@ public final class VectorCache {
*/
public static Vector load(Configuration conf) throws IOException {
Path[] files = DistributedCache.getLocalCacheFiles(conf);
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
if (files == null || files.length < 1) {
log.debug("getLocalCacheFiles failed, trying getCacheFiles");
URI[] filesURIs = DistributedCache.getCacheFiles(conf);
@@ -104,20 +105,19 @@ public final class VectorCache {
throw new IOException("Cannot read Frequency list from Distributed Cache");
}
if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
+ throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
}
files = new Path[1];
files[0] = new Path(filesURIs[0].getPath());
} else {
// Fallback if we are running locally.
- LocalFileSystem localFs = FileSystem.getLocal(conf);
if (!localFs.exists(files[0])) {
URI[] filesURIs = DistributedCache.getCacheFiles(conf);
if (filesURIs == null) {
throw new IOException("Cannot read Frequency list from Distributed Cache");
}
if (filesURIs.length != 1) {
- throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
+ throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
}
files[0] = new Path(filesURIs[0].getPath());
}
@@ -126,7 +126,7 @@ public final class VectorCache {
if (log.isInfoEnabled()) {
log.info("Files are: {}", Arrays.toString(files));
}
-
+ files[0] = localFs.makeQualified(files[0]);
return load(conf, files[0]);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Jun 9 20:51:18 2013
@@ -106,6 +106,7 @@ public final class PFPGrowth {
}
fListLocalPath = new Path(filesURIs[0].getPath());
}
+ fListLocalPath = fs.makeQualified(fListLocalPath);
for (Pair<Text,LongWritable> record
: new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true, conf)) {
list.add(new Pair<String,Long>(record.getFirst().toString(), record.getSecond().get()));
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java Sun Jun 9 20:51:18 2013
@@ -214,7 +214,13 @@ public final class TimesSquaredJob {
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
"missing paths from the DistributedCache");
- Path inputVectorPath = localFiles[0];//nocommit: is this the right pattern for use here?
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path inputVectorPath;
+ if (fs.exists(localFiles[0])) {
+ inputVectorPath = fs.makeQualified(localFiles[0]);
+ } else {//MAHOUT-992: this seems safe
+ inputVectorPath = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
+ }
SequenceFileValueIterator<VectorWritable> iterator =
new SequenceFileValueIterator<VectorWritable>(inputVectorPath, true, conf);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Sun Jun 9 20:51:18 2013
@@ -211,7 +211,7 @@ public final class BtJob {
Validate.notNull(rFiles,
"no RHat files in distributed cache job definition");
-
+ //TODO: this probably can be replaced w/ local fs makeQualified
Configuration lconf = new Configuration();
lconf.set("fs.default.name", "file:///");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java Sun Jun 9 20:51:18 2013
@@ -19,6 +19,7 @@ package org.apache.mahout.vectorizer.pru
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -32,7 +33,6 @@ import org.apache.mahout.math.map.OpenIn
import org.apache.mahout.vectorizer.HighDFWordsPruner;
import java.io.IOException;
-import java.net.URI;
import java.util.Iterator;
public class WordsPrunerReducer extends
@@ -44,7 +44,7 @@ public class WordsPrunerReducer extends
@Override
protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Iterator<VectorWritable> it = values.iterator();
if (!it.hasNext()) {
return;
@@ -78,11 +78,16 @@ public class WordsPrunerReducer extends
maxDf = conf.getLong(HighDFWordsPruner.MAX_DF, Long.MAX_VALUE);
minDf = conf.getLong(HighDFWordsPruner.MIN_DF, -1);
-
- Path dictionaryFile = localFiles[0];
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dictionaryFile;
+ if (fs.exists(localFiles[0])) {
+ dictionaryFile = fs.makeQualified(localFiles[0]);
+ } else {//MAHOUT-992: this seems safe
+ dictionaryFile = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
+ }
// key is feature, value is the document frequency
for (Pair<IntWritable, LongWritable> record
- : new SequenceFileIterable<IntWritable, LongWritable>(dictionaryFile, true, conf)) {
+ : new SequenceFileIterable<IntWritable, LongWritable>(dictionaryFile, true, conf)) {
dictionary.put(record.getFirst().get(), record.getSecond().get());
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun 9 20:51:18 2013
@@ -123,7 +123,7 @@ public class TFIDFPartialVectorReducer e
sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
- Path dictionaryFile = localFiles[0];
+ Path dictionaryFile = localFs.makeQualified(localFiles[0]);
// key is feature, value is the document frequency
for (Pair<IntWritable,LongWritable> record
: new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile, true, conf)) {
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java Sun Jun 9 20:51:18 2013
@@ -20,6 +20,7 @@ package org.apache.mahout.cf.taste.examp
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
@@ -66,6 +67,7 @@ public final class EmailUtility {
Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
Preconditions.checkArgument(localFiles != null,
"missing paths from the DistributedCache");
+ FileSystem fs = FileSystem.getLocal(conf);
for (Path dictionaryFile : localFiles) {
// key is word value is id
@@ -77,6 +79,7 @@ public final class EmailUtility {
dictionary = msgIdDictionary;
}
if (dictionary != null) {
+ dictionaryFile = fs.makeQualified(dictionaryFile);
for (Pair<Writable, IntWritable> record
: new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
dictionary.put(record.getFirst().toString(), record.getSecond().get());