You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 05:40:08 UTC

svn commit: r1079151 - /hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java

Author: omalley
Date: Tue Mar  8 04:40:08 2011
New Revision: 1079151

URL: http://svn.apache.org/viewvc?rev=1079151&view=rev
Log:
commit 446e41f2424ddc866770cda677450c18feeb597f
Author: Chris Douglas <cd...@apache.org>
Date:   Sun Oct 3 02:00:15 2010 -0700

    Apply y201 to trunk

Modified:
    hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java

Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=1079151&r1=1079150&r2=1079151&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Tue Mar  8 04:40:08 2011
@@ -128,8 +128,26 @@ public class LocalDirAllocator {
    */
   public Path getLocalPathForWrite(String pathStr, long size, 
       Configuration conf) throws IOException {
+	return getLocalPathForWrite(pathStr, size, conf, true);
+  }
+
+  /** Get a path from the local FS. Pass size as 
+   *  SIZE_UNKNOWN if not known apriori. We
+   *  round-robin over the set of disks (via the configured dirs) and return
+   *  the first complete path which has enough space 
+   *  @param pathStr the requested path (this will be created on the first 
+   *  available disk)
+   *  @param size the size of the file that is going to be written
+   *  @param conf the Configuration object
+   *  @param checkWrite ensure that the path is writable
+   *  @return the complete path to the file on a local disk
+   *  @throws IOException
+   */
+  public Path getLocalPathForWrite(String pathStr, long size, 
+                                   Configuration conf,
+                                   boolean checkWrite) throws IOException {
     AllocatorPerContext context = obtainContext(contextCfgItemName);
-    return context.getLocalPathForWrite(pathStr, size, conf);
+    return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
   }
   
   /** Get a path from the local FS for reading. We search through all the
@@ -146,6 +164,23 @@ public class LocalDirAllocator {
     return context.getLocalPathToRead(pathStr, conf);
   }
 
+  /**
+   * Get all of the paths that currently exist in the working directories.
+   * @param pathStr the path underneath the roots
+   * @param conf the configuration to look up the roots in
+   * @return all of the paths that exist under any of the roots
+   * @throws IOException
+   */
+  public Iterable<Path> getAllLocalPathsToRead(String pathStr, 
+                                               Configuration conf
+                                               ) throws IOException {
+    AllocatorPerContext context;
+    synchronized (this) {
+      context = obtainContext(contextCfgItemName);
+    }
+    return context.getAllLocalPathsToRead(pathStr, conf);    
+  }
+
   /** Creates a temporary file in the local FS. Pass size as -1 if not known 
    *  apriori. We round-robin over the set of disks (via the configured dirs) 
    *  and select the first complete path which has enough space. A file is
@@ -214,7 +249,8 @@ public class LocalDirAllocator {
     /** This method gets called everytime before any read/write to make sure
      * that any change to localDirs is reflected immediately.
      */
-    private void confChanged(Configuration conf) throws IOException {
+    private synchronized void confChanged(Configuration conf
+                                          ) throws IOException {
       String newLocalDirs = conf.get(contextCfgItemName);
       if (!newLocalDirs.equals(savedLocalDirs)) {
         localDirs = conf.getTrimmedStrings(contextCfgItemName);
@@ -252,18 +288,21 @@ public class LocalDirAllocator {
       }
     }
 
-    private Path createPath(String path) throws IOException {
+    private Path createPath(String path, 
+    		boolean checkWrite) throws IOException {
       Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
                                     path);
-      //check whether we are able to create a directory here. If the disk
-      //happens to be RDONLY we will fail
-      try {
-        DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
-        return file;
-      } catch (DiskErrorException d) {
-        LOG.warn(StringUtils.stringifyException(d));
-        return null;
+      if (checkWrite) {
+    	//check whether we are able to create a directory here. If the disk
+    	//happens to be RDONLY we will fail
+    	try {
+          DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
+    	} catch (DiskErrorException d) {
+          LOG.warn(StringUtils.stringifyException(d));
+          return null;
+    	}
       }
+      return file;
     }
 
     /**
@@ -274,17 +313,6 @@ public class LocalDirAllocator {
       return dirNumLastAccessed;
     }
     
-    /** Get a path from the local FS. This method should be used if the size of 
-     *  the file is not known a priori. 
-     *  
-     *  It will use roulette selection, picking directories
-     *  with probability proportional to their available space. 
-     */
-    public synchronized Path getLocalPathForWrite(String path, 
-        Configuration conf) throws IOException {
-      return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
-    }
-
     /** Get a path from the local FS. If size is known, we go
      *  round-robin over the set of disks (via the configured dirs) and return
      *  the first complete path which has enough space.
@@ -292,8 +320,10 @@ public class LocalDirAllocator {
      *  If size is not known, use roulette selection -- pick directories
      *  with probability proportional to their available space.
      */
-    public synchronized Path getLocalPathForWrite(String pathStr, long size, 
-        Configuration conf) throws IOException {
+    public synchronized 
+    Path getLocalPathForWrite(String pathStr, long size, 
+    	                      Configuration conf, boolean checkWrite
+    	                      ) throws IOException {
       confChanged(conf);
       int numDirs = localDirs.length;
       int numDirsSearched = 0;
@@ -325,7 +355,7 @@ public class LocalDirAllocator {
             dir++;
           }
           dirNumLastAccessed = dir;
-          returnPath = createPath(pathStr);
+          returnPath = createPath(pathStr, checkWrite);
           if (returnPath == null) {
             totalAvailable -= availableOnDisk[dir];
             availableOnDisk[dir] = 0; // skip this disk
@@ -336,7 +366,7 @@ public class LocalDirAllocator {
         while (numDirsSearched < numDirs && returnPath == null) {
           long capacity = dirDF[dirNumLastAccessed].getAvailable();
           if (capacity > size) {
-            returnPath = createPath(pathStr);
+            returnPath = createPath(pathStr, checkWrite);
           }
           dirNumLastAccessed++;
           dirNumLastAccessed = dirNumLastAccessed % numDirs; 
@@ -362,7 +392,7 @@ public class LocalDirAllocator {
         Configuration conf) throws IOException {
 
       // find an appropriate directory
-      Path path = getLocalPathForWrite(pathStr, size, conf);
+      Path path = getLocalPathForWrite(pathStr, size, conf, true);
       File dir = new File(path.getParent().toUri().getPath());
       String prefix = path.getName();
 
@@ -399,6 +429,76 @@ public class LocalDirAllocator {
       " the configured local directories");
     }
 
+    private static 
+    class PathIterator implements Iterator<Path>, Iterable<Path> {
+      private final FileSystem fs;
+      private final String pathStr;
+      private int i = 0;
+      private final String[] rootDirs;
+      private Path next = null;
+
+      private PathIterator(FileSystem fs, String pathStr, String[] rootDirs
+                           ) throws IOException {
+        this.fs = fs;
+        this.pathStr = pathStr;
+        this.rootDirs = rootDirs;
+        advance();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return next != null;
+      }
+
+      private void advance() throws IOException {
+        while (i < rootDirs.length) {
+          next = new Path(rootDirs[i++], pathStr);
+          if (fs.exists(next)) {
+            return;
+          }
+        }
+        next = null;
+      }
+
+      @Override
+      public Path next() {
+        Path result = next;
+        try {
+          advance();
+        } catch (IOException ie) {
+          throw new RuntimeException("Can't check existance of " + next, ie);
+        }
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("read only iterator");
+      }
+
+      @Override
+      public Iterator<Path> iterator() {
+        return this;
+      }
+    }
+
+    /**
+     * Get all of the paths that currently exist in the working directories.
+     * @param pathStr the path underneath the roots
+     * @param conf the configuration to look up the roots in
+     * @return all of the paths that exist under any of the roots
+     * @throws IOException
+     */
+    synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
+                                                       Configuration conf
+                                                       ) throws IOException {
+      confChanged(conf);
+      if (pathStr.startsWith("/")) {
+        pathStr = pathStr.substring(1);
+      }
+      return new PathIterator(localFS, pathStr, localDirs);
+    }
+
     /** We search through all the configured dirs for the file's existence
      *  and return true when we find one 
      */