You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2013/06/09 22:51:19 UTC

svn commit: r1491294 - in /mahout/trunk: core/src/main/java/org/apache/mahout/classifier/df/mapreduce/ core/src/main/java/org/apache/mahout/clustering/spectral/common/ core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ core/src/main/java/org/apache/ma...

Author: gsingers
Date: Sun Jun  9 20:51:18 2013
New Revision: 1491294

URL: http://svn.apache.org/r1491294
Log:
MAHOUT-992: more cleanup on DistCache usage

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java Sun Jun  9 20:51:18 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -203,11 +204,17 @@ public class Classifier {
       if (files == null || files.length < 2) {
         throw new IOException("not enough paths in the DistributedCache");
       }
-      
+      LocalFileSystem localFs = FileSystem.getLocal(conf);
+      if (!localFs.exists(files[0])) {//MAHOUT-992: this seems safe
+        files[0] = localFs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
+      }
+
       dataset = Dataset.load(conf, files[0]);
 
       converter = new DataConverter(dataset);
-
+      if (!localFs.exists(files[1])) {//MAHOUT-992: this seems safe
+        files[1] = localFs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[1].getPath()));
+      }
       forest = DecisionForest.load(conf, files[1]);
       if (forest == null) {
         throw new InterruptedException("DecisionForest not found!");

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java Sun Jun  9 20:51:18 2013
@@ -97,6 +97,7 @@ public final class VectorCache {
    */
   public static Vector load(Configuration conf) throws IOException {
     Path[] files = DistributedCache.getLocalCacheFiles(conf);
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
     if (files == null || files.length < 1) {
       log.debug("getLocalCacheFiles failed, trying getCacheFiles");
       URI[] filesURIs = DistributedCache.getCacheFiles(conf);
@@ -104,20 +105,19 @@ public final class VectorCache {
         throw new IOException("Cannot read Frequency list from Distributed Cache");
       }
       if (filesURIs.length != 1) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
+        throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
       }
       files = new Path[1];
       files[0] = new Path(filesURIs[0].getPath());
     } else {
       // Fallback if we are running locally.
-      LocalFileSystem localFs = FileSystem.getLocal(conf);
       if (!localFs.exists(files[0])) {
         URI[] filesURIs = DistributedCache.getCacheFiles(conf);
         if (filesURIs == null) {
           throw new IOException("Cannot read Frequency list from Distributed Cache");
         }
         if (filesURIs.length != 1) {
-          throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
+          throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
         }
         files[0] = new Path(filesURIs[0].getPath());
       }
@@ -126,7 +126,7 @@ public final class VectorCache {
     if (log.isInfoEnabled()) {
       log.info("Files are: {}", Arrays.toString(files));
     }
-
+    files[0] = localFs.makeQualified(files[0]);
     return load(conf, files[0]);
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Jun  9 20:51:18 2013
@@ -106,6 +106,7 @@ public final class PFPGrowth {
       }
       fListLocalPath = new Path(filesURIs[0].getPath());
     }
+    fListLocalPath = fs.makeQualified(fListLocalPath);
     for (Pair<Text,LongWritable> record
          : new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true, conf)) {
       list.add(new Pair<String,Long>(record.getFirst().toString(), record.getSecond().get()));

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java Sun Jun  9 20:51:18 2013
@@ -214,7 +214,13 @@ public final class TimesSquaredJob {
         Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
         Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
                                     "missing paths from the DistributedCache");
-        Path inputVectorPath = localFiles[0];//nocommit: is this the right pattern for use here?
+        FileSystem fs = FileSystem.getLocal(conf);
+        Path inputVectorPath;
+        if (fs.exists(localFiles[0])) {
+          inputVectorPath = fs.makeQualified(localFiles[0]);
+        } else {//MAHOUT-992: this seems safe
+          inputVectorPath = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
+        }
 
         SequenceFileValueIterator<VectorWritable> iterator =
             new SequenceFileValueIterator<VectorWritable>(inputVectorPath, true, conf);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Sun Jun  9 20:51:18 2013
@@ -211,7 +211,7 @@ public final class BtJob {
 
         Validate.notNull(rFiles,
                          "no RHat files in distributed cache job definition");
-
+        //TODO: this probably can be replaced w/ local fs makeQualified
         Configuration lconf = new Configuration();
         lconf.set("fs.default.name", "file:///");
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java Sun Jun  9 20:51:18 2013
@@ -19,6 +19,7 @@ package org.apache.mahout.vectorizer.pru
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -32,7 +33,6 @@ import org.apache.mahout.math.map.OpenIn
 import org.apache.mahout.vectorizer.HighDFWordsPruner;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Iterator;
 
 public class WordsPrunerReducer extends
@@ -44,7 +44,7 @@ public class WordsPrunerReducer extends
 
   @Override
   protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context)
-    throws IOException, InterruptedException {
+          throws IOException, InterruptedException {
     Iterator<VectorWritable> it = values.iterator();
     if (!it.hasNext()) {
       return;
@@ -78,11 +78,16 @@ public class WordsPrunerReducer extends
 
     maxDf = conf.getLong(HighDFWordsPruner.MAX_DF, Long.MAX_VALUE);
     minDf = conf.getLong(HighDFWordsPruner.MIN_DF, -1);
-
-    Path dictionaryFile = localFiles[0];
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dictionaryFile;
+    if (fs.exists(localFiles[0])) {
+      dictionaryFile = fs.makeQualified(localFiles[0]);
+    } else {//MAHOUT-992: this seems safe
+      dictionaryFile = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
+    }
     // key is feature, value is the document frequency
     for (Pair<IntWritable, LongWritable> record
-        : new SequenceFileIterable<IntWritable, LongWritable>(dictionaryFile, true, conf)) {
+            : new SequenceFileIterable<IntWritable, LongWritable>(dictionaryFile, true, conf)) {
       dictionary.put(record.getFirst().get(), record.getSecond().get());
     }
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun  9 20:51:18 2013
@@ -123,7 +123,7 @@ public class TFIDFPartialVectorReducer e
     sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
     namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
 
-    Path dictionaryFile = localFiles[0];
+    Path dictionaryFile = localFs.makeQualified(localFiles[0]);
     // key is feature, value is the document frequency
     for (Pair<IntWritable,LongWritable> record 
          : new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile, true, conf)) {

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java?rev=1491294&r1=1491293&r2=1491294&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java Sun Jun  9 20:51:18 2013
@@ -20,6 +20,7 @@ package org.apache.mahout.cf.taste.examp
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -66,6 +67,7 @@ public final class EmailUtility {
     Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
     Preconditions.checkArgument(localFiles != null,
             "missing paths from the DistributedCache");
+    FileSystem fs = FileSystem.getLocal(conf);
     for (Path dictionaryFile : localFiles) {
 
       // key is word value is id
@@ -77,6 +79,7 @@ public final class EmailUtility {
         dictionary = msgIdDictionary;
       }
       if (dictionary != null) {
+        dictionaryFile = fs.makeQualified(dictionaryFile);
         for (Pair<Writable, IntWritable> record
             : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
           dictionary.put(record.getFirst().toString(), record.getSecond().get());