You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/09 23:43:56 UTC

svn commit: r1491301 - in /mahout/trunk: core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ core/src/main/java/org/apache/mahout/classifier/df/mapreduce/ core/src/main/java/org/apache/mahout/classifier/naivebayes/ core/src/main/java/org/apache/m...

Author: ssc
Date: Sun Jun  9 21:43:55 2013
New Revision: 1491301

URL: http://svn.apache.org/r1491301
Log:
MAHOUT-992 Audit DistributedCache use to support EMR

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
    mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java Sun Jun  9 21:43:55 2013
@@ -21,12 +21,12 @@ import com.google.common.base.Preconditi
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -71,23 +71,18 @@ final class ALS {
     IntWritable rowIndex = new IntWritable();
     VectorWritable row = new VectorWritable();
 
-    LocalFileSystem localFs = FileSystem.getLocal(conf);
-    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
 
     OpenIntObjectHashMap<Vector> featureMatrix = numEntities > 0
         ? new OpenIntObjectHashMap<Vector>(numEntities) : new OpenIntObjectHashMap<Vector>();
 
-    for (int n = 0; n < cacheFiles.length; n++) {
-      Path localCacheFile = localFs.makeQualified(cacheFiles[n]);
+    Path[] cachedFiles = HadoopUtil.getCachedFiles(conf);
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
 
-      // fallback for local execution
-      if (!localFs.exists(localCacheFile)) {//MAHOUT-992: this seems safe
-        localCacheFile = new Path(DistributedCache.getCacheFiles(conf)[n].getPath());
-      }
+    for (int n = 0; n < cachedFiles.length; n++) {
 
       SequenceFile.Reader reader = null;
       try {
-        reader = new SequenceFile.Reader(localFs, localCacheFile, conf);
+        reader = new SequenceFile.Reader(localFs, cachedFiles[n], conf);
         while (reader.next(rowIndex, row)) {
           featureMatrix.put(rowIndex.get(), row.get());
         }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Builder.java Sun Jun  9 21:43:55 2013
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Comparator;
 
@@ -200,9 +199,9 @@ public abstract class Builder {
    *           if no path is found
    */
   public static Path getDistributedCacheFile(Configuration conf, int index) throws IOException {
-    Path[] files = DistributedCache.getLocalCacheFiles(conf);
+    Path[] files = HadoopUtil.getCachedFiles(conf);
     
-    if (files == null || files.length <= index) {
+    if (files.length <= index) {
       throw new IOException("path not found in the DistributedCache");
     }
     

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/df/mapreduce/Classifier.java Sun Jun  9 21:43:55 2013
@@ -199,9 +199,9 @@ public class Classifier {
 
       Configuration conf = context.getConfiguration();
 
-      Path[] files = DistributedCache.getLocalCacheFiles(conf);
+      Path[] files = HadoopUtil.getCachedFiles(conf);
 
-      if (files == null || files.length < 2) {
+      if (files.length < 2) {
         throw new IOException("not enough paths in the DistributedCache");
       }
       LocalFileSystem localFs = FileSystem.getLocal(conf);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java Sun Jun  9 21:43:55 2013
@@ -143,7 +143,7 @@ public final class BayesUtils {
   public static OpenObjectIntHashMap<String> readIndexFromCache(Configuration conf) throws IOException {
     OpenObjectIntHashMap<String> index = new OpenObjectIntHashMap<String>();
     for (Pair<Writable,IntWritable> entry
-        : new SequenceFileIterable<Writable,IntWritable>(HadoopUtil.cachedFile(conf), conf)) {
+        : new SequenceFileIterable<Writable,IntWritable>(HadoopUtil.getSingleCachedFile(conf), conf)) {
       index.put(entry.getFirst().toString(), entry.getSecond().get());
     }
     return index;
@@ -152,7 +152,7 @@ public final class BayesUtils {
   public static Map<String,Vector> readScoresFromCache(Configuration conf) throws IOException {
     Map<String,Vector> sumVectors = Maps.newHashMap();
     for (Pair<Text,VectorWritable> entry
-        : new SequenceFileDirIterable<Text,VectorWritable>(HadoopUtil.cachedFile(conf),
+        : new SequenceFileDirIterable<Text,VectorWritable>(HadoopUtil.getSingleCachedFile(conf),
           PathType.LIST, PathFilters.partFilter(), conf)) {
       sumVectors.put(entry.getFirst().toString(), entry.getSecond().get());
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java Sun Jun  9 21:43:55 2013
@@ -47,7 +47,7 @@ public class BayesTestMapper extends Map
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration conf = context.getConfiguration();
-    Path modelPath = HadoopUtil.cachedFile(conf);
+    Path modelPath = HadoopUtil.getSingleCachedFile(conf);
     NaiveBayesModel model = NaiveBayesModel.materialize(modelPath, conf);
     boolean compl = Boolean.parseBoolean(conf.get(TestNaiveBayesDriver.COMPLEMENTARY));
     if (compl) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java Sun Jun  9 21:43:55 2013
@@ -25,7 +25,6 @@ import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -96,37 +95,15 @@ public final class VectorCache {
    * Loads the vector from {@link DistributedCache}. Returns null if no vector exists.
    */
   public static Vector load(Configuration conf) throws IOException {
-    Path[] files = DistributedCache.getLocalCacheFiles(conf);
-    LocalFileSystem localFs = FileSystem.getLocal(conf);
-    if (files == null || files.length < 1) {
-      log.debug("getLocalCacheFiles failed, trying getCacheFiles");
-      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
-      if (filesURIs == null) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache");
-      }
-      if (filesURIs.length != 1) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
-      }
-      files = new Path[1];
-      files[0] = new Path(filesURIs[0].getPath());
-    } else {
-      // Fallback if we are running locally.
-      if (!localFs.exists(files[0])) {
-        URI[] filesURIs = DistributedCache.getCacheFiles(conf);
-        if (filesURIs == null) {
-          throw new IOException("Cannot read Frequency list from Distributed Cache");
-        }
-        if (filesURIs.length != 1) {
-          throw new IOException("Cannot read Frequency list from Distributed Cache (" + filesURIs.length + ')');
-        }
-        files[0] = new Path(filesURIs[0].getPath());
-      }
+    Path[] files = HadoopUtil.getCachedFiles(conf);
+
+    if (files.length != 1) {
+      throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
     }
 
     if (log.isInfoEnabled()) {
       log.info("Files are: {}", Arrays.toString(files));
     }
-    files[0] = localFs.makeQualified(files[0]);
     return load(conf, files[0]);
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Sun Jun  9 21:43:55 2013
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 
+import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
@@ -272,9 +274,46 @@ public final class HadoopUtil {
    * @return
    * @throws IOException
    */
-  public static Path cachedFile(Configuration conf) throws IOException {
+  public static Path getSingleCachedFile(Configuration conf) throws IOException {
+    return getCachedFiles(conf)[0];
+  }
+
+  /**
+   * Retrieves paths to cached files.
+   * @param conf
+   * @return
+   * @throws IOException
+   * @throws IllegalStateException if no cache files are found
+   */
+  public static Path[] getCachedFiles(Configuration conf) throws IOException {
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
     Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
-    return cacheFiles != null && cacheFiles.length > 0 ? cacheFiles[0] : null;
+
+    URI[] fallbackFiles = DistributedCache.getCacheFiles(conf);
+
+    // fallback for local execution
+    if (cacheFiles == null) {
+
+      Preconditions.checkState(fallbackFiles != null, "Unable to find cached files!");
+
+      cacheFiles = new Path[fallbackFiles.length];
+      for (int n = 0; n < fallbackFiles.length; n++) {
+        cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+      }
+    } else {
+
+      for (int n = 0; n < cacheFiles.length; n++) {
+        cacheFiles[n] = localFs.makeQualified(cacheFiles[n]);
+        // fallback for local execution
+        if (!localFs.exists(cacheFiles[n])) {
+          cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+        }
+      }
+    }
+
+    Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached files!");
+
+    return cacheFiles;
   }
 
   public static void setSerializations(Configuration conf) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Sun Jun  9 21:43:55 2013
@@ -18,7 +18,6 @@
 package org.apache.mahout.fpm.pfpgrowth;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -86,29 +85,14 @@ public final class PFPGrowth {
    */
   public static List<Pair<String,Long>> readFList(Configuration conf) throws IOException {
     List<Pair<String,Long>> list = Lists.newArrayList();
-    Path[] files = DistributedCache.getLocalCacheFiles(conf);
-    if (files == null) {
-      throw new IOException("Cannot read Frequency list from Distributed Cache");
-    }
+
+    Path[] files = HadoopUtil.getCachedFiles(conf);
     if (files.length != 1) {
       throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
     }
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path fListLocalPath = fs.makeQualified(files[0]);
-    // Fallback if we are running locally.
-    if (!fs.exists(fListLocalPath)) {
-      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
-      if (filesURIs == null) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache");
-      }
-      if (filesURIs.length != 1) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')');
-      }
-      fListLocalPath = new Path(filesURIs[0].getPath());
-    }
-    fListLocalPath = fs.makeQualified(fListLocalPath);
+
     for (Pair<Text,LongWritable> record
-         : new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true, conf)) {
+         : new SequenceFileIterable<Text,LongWritable>(files[0], true, conf)) {
       list.add(new Pair<String,Long>(record.getFirst().toString(), record.getSecond().get()));
     }
     return list;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java Sun Jun  9 21:43:55 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.RandomAccessSparseVector;
@@ -214,13 +215,8 @@ public final class TimesSquaredJob {
         Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
         Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
                                     "missing paths from the DistributedCache");
-        FileSystem fs = FileSystem.getLocal(conf);
-        Path inputVectorPath;
-        if (fs.exists(localFiles[0])) {
-          inputVectorPath = fs.makeQualified(localFiles[0]);
-        } else {//MAHOUT-992: this seems safe
-          inputVectorPath = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
-        }
+
+        Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf);
 
         SequenceFileValueIterator<VectorWritable> iterator =
             new SequenceFileValueIterator<VectorWritable>(inputVectorPath, true, conf);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java Sun Jun  9 21:43:55 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -322,7 +323,7 @@ public final class ABtDenseOutJob {
       blockHeight = conf.getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
       distributedBt = conf.get(PROP_BT_BROADCAST) != null;
       if (distributedBt) {
-        btLocalPath = DistributedCache.getLocalCacheFiles(conf);
+        btLocalPath = HadoopUtil.getCachedFiles(conf);
         localFsConfig = new Configuration();
         localFsConfig.set("fs.default.name", "file:///");
       }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java Sun Jun  9 21:43:55 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -211,8 +212,7 @@ public final class ABtJob {
 
       if (distributedBt) {
 
-        Path[] btFiles =
-          DistributedCache.getLocalCacheFiles(context.getConfiguration());
+        Path[] btFiles = HadoopUtil.getCachedFiles(context.getConfiguration());
 
         // DEBUG: stdout
         //System.out.printf("list of files: " + btFiles);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Sun Jun  9 21:43:55 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
@@ -207,7 +208,7 @@ public final class BtJob {
       boolean distributedRHat = conf.get(PROP_RHAT_BROADCAST) != null;
       if (distributedRHat) {
 
-        Path[] rFiles = DistributedCache.getLocalCacheFiles(conf);
+        Path[] rFiles = HadoopUtil.getCachedFiles(conf);
 
         Validate.notNull(rFiles,
                          "no RHat files in distributed cache job definition");

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/pruner/WordsPrunerReducer.java Sun Jun  9 21:43:55 2013
@@ -16,15 +16,13 @@ package org.apache.mahout.vectorizer.pru
  * limitations under the License.
  */
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.math.Vector;
@@ -72,19 +70,13 @@ public class WordsPrunerReducer extends
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration conf = context.getConfiguration();
-    Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
-    Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
-            "missing paths from the DistributedCache");
+    Path[] localFiles = HadoopUtil.getCachedFiles(conf);
 
     maxDf = conf.getLong(HighDFWordsPruner.MAX_DF, Long.MAX_VALUE);
     minDf = conf.getLong(HighDFWordsPruner.MIN_DF, -1);
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path dictionaryFile;
-    if (fs.exists(localFiles[0])) {
-      dictionaryFile = fs.makeQualified(localFiles[0]);
-    } else {//MAHOUT-992: this seems safe
-      dictionaryFile = fs.makeQualified(new Path(DistributedCache.getCacheFiles(conf)[0].getPath()));
-    }
+
+    Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
+
     // key is feature, value is the document frequency
     for (Pair<IntWritable, LongWritable> record
             : new SequenceFileIterable<IntWritable, LongWritable>(dictionaryFile, true, conf)) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java Sun Jun  9 21:43:55 2013
@@ -17,12 +17,8 @@
 
 package org.apache.mahout.vectorizer.term;
 
-import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -30,6 +26,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.lucene.analysis.shingle.ShingleFilter;
 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
@@ -46,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Iterator;
 
@@ -121,34 +117,17 @@ public class TFPartialVectorReducer exte
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration conf = context.getConfiguration();
-    Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
-    Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
-            "missing paths from the DistributedCache");
-    LocalFileSystem localFs = FileSystem.getLocal(conf);
-    if (!localFs.exists(localFiles[0])) {
-      log.info("Can't find dictionary dist. cache file, looking in .getCacheFiles");
-      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
-      if (filesURIs == null) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache");
-      }
-      if (filesURIs.length != 1) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
-      }
-      localFiles[0] = new Path(filesURIs[0].getPath());
-    }
 
     dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
     sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
     namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
     maxNGramSize = conf.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
-    if (log.isInfoEnabled()) {
-      log.info("Cache Files: " + Arrays.asList(localFiles));
-    }
+
     //MAHOUT-1247
-    localFiles[0] = localFs.makeQualified(localFiles[0]);
+    Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
     // key is word value is id
     for (Pair<Writable, IntWritable> record
-            : new SequenceFileIterable<Writable, IntWritable>(localFiles[0], true, conf)) {
+            : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
       dictionary.put(record.getFirst().toString(), record.getSecond().get());
     }
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java Sun Jun  9 21:43:55 2013
@@ -18,19 +18,15 @@
 package org.apache.mahout.vectorizer.tfidf;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Iterator;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.math.NamedVector;
@@ -102,20 +98,7 @@ public class TFIDFPartialVectorReducer e
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration conf = context.getConfiguration();
-    Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
-    Preconditions.checkArgument(localFiles != null && localFiles.length >= 1, 
-        "missing paths from the DistributedCache");
-    LocalFileSystem localFs = FileSystem.getLocal(conf);
-    if (!localFs.exists(localFiles[0])) {
-      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
-      if (filesURIs == null) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache");
-      }
-      if (filesURIs.length != 1) {
-        throw new IOException("Cannot read Frequency list from Distributed Cache (" + localFiles.length + ')');
-      }
-      localFiles[0] = new Path(filesURIs[0].getPath());
-    }
+
     vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
     featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
     minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);
@@ -123,7 +106,7 @@ public class TFIDFPartialVectorReducer e
     sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
     namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
 
-    Path dictionaryFile = localFs.makeQualified(localFiles[0]);
+    Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
     // key is feature, value is the document frequency
     for (Pair<IntWritable,LongWritable> record 
          : new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile, true, conf)) {

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/classifier/df/data/Utils.java Sun Jun  9 21:43:55 2013
@@ -28,7 +28,6 @@ import com.google.common.io.Closeables;
 import com.google.common.io.Files;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.mahout.classifier.df.data.Dataset.Attribute;

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java Sun Jun  9 21:43:55 2013
@@ -20,7 +20,6 @@ package org.apache.mahout.clustering.str
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java Sun Jun  9 21:43:55 2013
@@ -17,19 +17,17 @@
 
 package org.apache.mahout.cf.taste.example.email;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.math.map.OpenObjectIntHashMap;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.regex.Pattern;
 
 public final class EmailUtility {
@@ -64,9 +62,7 @@ public final class EmailUtility {
                                       String msgIdPrefix,
                                       OpenObjectIntHashMap<String> msgIdDictionary) throws IOException {
 
-    Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
-    Preconditions.checkArgument(localFiles != null,
-            "missing paths from the DistributedCache");
+    Path[] localFiles = HadoopUtil.getCachedFiles(conf);
     FileSystem fs = FileSystem.getLocal(conf);
     for (Path dictionaryFile : localFiles) {
 

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java?rev=1491301&r1=1491300&r2=1491301&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java Sun Jun  9 21:43:55 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.lucene.search.TermQuery;
 import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
@@ -34,7 +33,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Arrays.asList;