You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2011/05/06 04:11:32 UTC

svn commit: r1100026 [1/3] - in /hadoop/common/trunk: ./ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/viewfs/ src/test/core/org/apache/hadoop/fs/ src/test/core/org/apache/hadoop/fs/viewfs/

Author: sradia
Date: Fri May  6 02:11:31 2011
New Revision: 1100026

URL: http://svn.apache.org/viewvc?rev=1100026&view=rev
Log:
HADOOP-7257 Client side mount tables (sanjay)

Added:
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/Constants.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/InodeTree.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFs.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFsFileStatus.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestChRootedFileSystem.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestChRootedFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestFcCreateMkdirLocalFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestFcMainOperationsLocalFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestFcPermissionsLocalFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithAuthorityLocalFileSystem.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestViewFsLocalFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestViewFsWithAuthorityLocalFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/TestViewfsFileStatus.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/core-default.xml
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/FSMainOperationsBaseTest.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/FileContextPermissionBase.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/FileContextTestHelper.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/FileSystemTestHelper.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Fri May  6 02:11:31 2011
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
 
     HADOOP-7171. Support UGI in FileContext API. (jitendra)
 
+    HADOOP-7257 Client side mount tables (sanjay)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

Modified: hadoop/common/trunk/src/java/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/core-default.xml?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/core-default.xml (original)
+++ hadoop/common/trunk/src/java/core-default.xml Fri May  6 02:11:31 2011
@@ -262,6 +262,13 @@
 </property>
 
 <property>
+  <name>fs.viewfs.impl</name>
+  <value>org.apache.hadoop.fs.viewfs.ViewFileSystem</value>
+  <description>The FileSystem for view file system for viewfs: uris
+  (ie client side mount table:).</description>
+</property>
+
+<property>
   <name>fs.AbstractFileSystem.file.impl</name>
   <value>org.apache.hadoop.fs.local.LocalFs</value>
   <description>The AbstractFileSystem for file: uris.</description>
@@ -275,6 +282,13 @@
 </property>
 
 <property>
+  <name>fs.AbstractFileSystem.viewfs.impl</name>
+  <value>org.apache.hadoop.fs.viewfs.ViewFs</value>
+  <description>The AbstractFileSystem for view file system for viewfs: uris
+  (ie client side mount table:).</description>
+</property>
+
+<property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
   <description>The FileSystem for s3: uris.</description>

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java Fri May  6 02:11:31 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -290,18 +291,19 @@ public abstract class AbstractFileSystem
               + " is not valid");
     }
     String authority = uri.getAuthority();
-    if (!authorityNeeded) {
-      if (authority != null) {
-        throw new HadoopIllegalArgumentException("Scheme with non-null authority: "
-            + uri);
-      }
-      return new URI(supportedScheme + ":///");
-    }
     if (authority == null) {
-      throw new HadoopIllegalArgumentException("Uri without authority: " + uri);
+       if (authorityNeeded) {
+         throw new HadoopIllegalArgumentException("Uri without authority: " + uri);
+       } else {
+         return new URI(supportedScheme + ":///");
+       }   
     }
+    // authority is non null  - AuthorityNeeded may be true or false.
     int port = uri.getPort();
-    port = port == -1 ? defaultPort : port;
+    port = (port == -1 ? defaultPort : port);
+    if (port == -1) { // no port supplied and default port is not specified
+      return new URI(supportedScheme, authority, "/", null);
+    }
     return new URI(supportedScheme + "://" + uri.getHost() + ":" + port);
   }
   
@@ -432,6 +434,21 @@ public abstract class AbstractFileSystem
   public abstract FsServerDefaults getServerDefaults() throws IOException; 
 
   /**
+   * Return the fully-qualified path of path f resolving the path
+   * through any internal symlinks or mount point
+   * @param p path to be resolved
+   * @return fully qualified path 
+   * @throws FileNotFoundException, AccessControlException, IOException
+   *         UnresolvedLinkException if symbolic link on path cannot be resolved
+   *          internally
+   */
+   public Path resolvePath(final Path p) throws FileNotFoundException,
+           UnresolvedLinkException, AccessControlException, IOException {
+     checkPath(p);
+     return getFileStatus(p).getPath(); // default impl is to return the path
+   }
+  
+  /**
    * The specification of this method matches that of
    * {@link FileContext#create(Path, EnumSet, Options.CreateOpts...)} except
    * that the Path f must be fully qualified and the permission is absolute
@@ -905,11 +922,12 @@ public abstract class AbstractFileSystem
    * 
    * @param renewer the account name that is allowed to renew the token.
    * @return List of delegation tokens.
+   *   If delegation tokens not supported then return a list of size zero.
    * @throws IOException
    */
   @InterfaceAudience.LimitedPrivate( { "HDFS", "MapReduce" })
   public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
-    return null;
+    return new ArrayList<Token<?>>(0);
   }
   
   @Override //Object

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java Fri May  6 02:11:31 2011
@@ -29,10 +29,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 
 /**

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java Fri May  6 02:11:31 2011
@@ -43,7 +43,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Options.CreateOpts;
-import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RpcClientException;
@@ -526,6 +525,15 @@ public final class FileContext {
   }
   
   /**
+   * Return the current user's home directory in this file system.
+   * The default implementation returns "/user/$USER/".
+   * @return the home directory
+   */
+  public Path getHomeDirectory() {
+    return defaultFS.getHomeDirectory();
+  }
+  
+  /**
    * 
    * @return the umask of this FileContext
    */

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java Fri May  6 02:11:31 2011
@@ -255,18 +255,18 @@ public class FileStatus implements Writa
   //////////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, getPath().toString());
-    out.writeLong(length);
-    out.writeBoolean(isdir);
-    out.writeShort(block_replication);
-    out.writeLong(blocksize);
-    out.writeLong(modification_time);
-    out.writeLong(access_time);
-    permission.write(out);
-    Text.writeString(out, owner);
-    Text.writeString(out, group);
+    out.writeLong(getLen());
+    out.writeBoolean(isDirectory());
+    out.writeShort(getReplication());
+    out.writeLong(getBlockSize());
+    out.writeLong(getModificationTime());
+    out.writeLong(getAccessTime());
+    getPermission().write(out);
+    Text.writeString(out, getOwner());
+    Text.writeString(out, getGroup());
     out.writeBoolean(isSymlink());
     if (isSymlink()) {
-      Text.writeString(out, symlink.toString());
+      Text.writeString(out, getSymlink().toString());
     }
   }
 

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri May  6 02:11:31 2011
@@ -50,8 +50,6 @@ import org.apache.hadoop.io.MultipleIOEx
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -360,12 +358,14 @@ public abstract class FileSystem extends
   }
     
   /**
+   * Deprecated  - use @link {@link #getDelegationTokens(String)}
    * Get a new delegation token for this file system.
    * @param renewer the account name that is allowed to renew the token.
    * @return a new delegation token
    * @throws IOException
    */
   @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+  @Deprecated
   public Token<?> getDelegationToken(String renewer) throws IOException {
     return null;
   }
@@ -378,11 +378,12 @@ public abstract class FileSystem extends
    * 
    * @param renewer the account name that is allowed to renew the token.
    * @return list of new delegation tokens
+   *    If delegation tokens not supported then return a list of size zero.
    * @throws IOException
    */
   @InterfaceAudience.LimitedPrivate( { "HDFS", "MapReduce" })
   public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
-    return null;
+    return new ArrayList<Token<?>>(0);
   }
 
   /** create a file with the provided permission
@@ -535,6 +536,18 @@ public abstract class FileSystem extends
         getDefaultReplication(), 
         conf.getInt("io.file.buffer.size", 4096));
   }
+  
+  /**
+   * Return the fully-qualified path of path f resolving the path
+   * through any symlinks or mount point
+   * @param p path to be resolved
+   * @return fully qualified path 
+   * @throws FileNotFoundException
+   */
+   public Path resolvePath(final Path p) throws IOException {
+     checkPath(p);
+     return getFileStatus(p).getPath();
+   }
 
   /**
    * Opens an FSDataInputStream at the indicated Path.

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java Fri May  6 02:11:31 2011
@@ -94,7 +94,11 @@ public class FilterFileSystem extends Fi
     long len) throws IOException {
       return fs.getFileBlockLocations(file, start, len);
   }
-  
+
+  @Override
+  public Path resolvePath(final Path p) throws IOException {
+    return fs.resolvePath(p);
+  }
   /**
    * Opens an FSDataInputStream at the indicated Path.
    * @param f the file name to open
@@ -363,6 +367,7 @@ public class FilterFileSystem extends Fi
   }
 
   @Override
+  @SuppressWarnings("deprecation")
   protected boolean primitiveMkdir(Path f, FsPermission abdolutePermission)
       throws IOException {
     return fs.primitiveMkdir(f, abdolutePermission);
@@ -374,6 +379,7 @@ public class FilterFileSystem extends Fi
   }
   
   @Override // FileSystem
+  @SuppressWarnings("deprecation")
   public Token<?> getDelegationToken(String renewer) throws IOException {
     return fs.getDelegationToken(renewer);
   }

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java?rev=1100026&r1=1100025&r2=1100026&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java Fri May  6 02:11:31 2011
@@ -138,6 +138,13 @@ public abstract class FilterFs extends A
   public FsServerDefaults getServerDefaults() throws IOException {
     return myFs.getServerDefaults();
   }
+  
+
+  @Override
+  public Path resolvePath(final Path p) throws FileNotFoundException,
+        UnresolvedLinkException, AccessControlException, IOException {
+    return myFs.resolvePath(p);
+  }
 
   @Override
   public int getUriDefaultPort() {

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java?rev=1100026&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java Fri May  6 02:11:31 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <code>ChRootedFileSystem</code> is a file system with its root some path
+ * below the root of its base file system. 
+ * 
+ * Example: For a base file system hdfs://nn1/ with chRoot at /usr/foo, the
+ * members will be setup as shown below.
+ * <ul>
+ * <li>myFs is the base file system and points to hdfs at nn1</li>
+ * <li>myURI is hdfs://nn1/user/foo</li>
+ * <li>chRootPathPart is /user/foo</li>
+ * <li>workingDir is a directory related to chRoot</li>
+ * </ul>
+ * 
+ * The paths are resolved as follows by ChRootedFileSystem:
+ * <ul>
+ * <li> Absolute path /a/b/c is resolved to /user/foo/a/b/c at myFs</li>
+ * <li> Relative path x/y is resolved to /user/foo/<workingDir>/x/y</li>
+ * </ul>
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+class ChRootedFileSystem extends FileSystem {
+  private final FileSystem myFs; // the base file system whose root is changed
+  private final URI myUri; // the base URI + the chRoot
+  private final Path chRootPathPart; // the root below the root of the base
+  private final String chRootPathPartString;
+  private Path workingDir;
+  
+  protected FileSystem getMyFs() {
+    return myFs;
+  }
+  
+  /**
+   * @param path
+   * @return  full path including the chroot 
+   */
+  protected Path fullPath(final Path path) {
+    super.checkPath(path);
+    return path.isAbsolute() ? 
+        new Path(chRootPathPartString + path.toUri().getPath()) :
+        new Path(chRootPathPartString + workingDir.toUri().getPath(), path);
+  }
+  
+  /**
+   * Constructor
+   * @param fs base file system
+   * @param theRoot chRoot for this file system
+   * @throws URISyntaxException
+   */
+  public ChRootedFileSystem(final FileSystem fs, final Path theRoot)
+    throws URISyntaxException {
+    myFs = fs;
+    myFs.makeQualified(theRoot); //check that root is a valid path for fs
+                            // Would like to call myFs.checkPath(theRoot); 
+                            // but not public
+    chRootPathPart = new Path(theRoot.toUri().getPath());
+    chRootPathPartString = chRootPathPart.toUri().getPath();
+    try {
+      initialize(fs.getUri(), fs.getConf());
+    } catch (IOException e) { // This exception should not be thrown
+      throw new RuntimeException("This should not occur");
+    }
+    
+    /*
+     * We are making URI include the chrootedPath: e.g. file:///chrootedPath.
+     * This is questionable since Path#makeQualified(uri, path) ignores
+     * the pathPart of a uri. Since this class is internal we can ignore
+     * this issue but if we were to make it external then this needs
+     * to be resolved.
+     */
+    // Handle the two cases:
+    //              scheme:/// and scheme://authority/
+    myUri = new URI(myFs.getUri().toString() + 
+        (myFs.getUri().getAuthority() == null ? "" :  Path.SEPARATOR) +
+          chRootPathPart.toString().substring(1));
+
+    workingDir = getHomeDirectory();
+    // We don't use the wd of the myFs,  (lets set it to root anyway)
+    myFs.setWorkingDirectory(chRootPathPart);
+  }
+  
+  /** 
+   * Called after a new FileSystem instance is constructed.
+   * @param name a uri whose authority section names the host, port, etc.
+   *   for this FileSystem
+   * @param conf the configuration
+   */
+  public void initialize(final URI name, final Configuration conf)
+      throws IOException {
+    myFs.initialize(name, conf);
+    super.initialize(name, conf);
+    setConf(conf);
+  }
+
+  @Override
+  public URI getUri() {
+    return myUri;
+  }
+  
+  @Override
+  public Path makeQualified(final Path path) {
+    return myFs.makeQualified(path);
+    // NOT myFs.makeQualified(fullPath(path));
+  }
+ 
+  /**
+   * Strip out the root from the path.
+   * @param p - fully qualified path p
+   * @return -  the remaining path  without the begining /
+   * @throws IOException if the p is not prefixed with root
+   */
+  String stripOutRoot(final Path p) throws IOException {
+    try {
+     checkPath(p);
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Internal Error - path " + p +
+          " should have been with URI: " + myUri);
+    }
+    String pathPart = p.toUri().getPath();
+    return (pathPart.length() == chRootPathPartString.length()) ? "" : pathPart
+        .substring(chRootPathPartString.length() + 1);   
+  }
+  
+  @Override
+  protected Path getInitialWorkingDirectory() {
+    /*
+     * 3 choices here: 
+     *     null or / or /user/<uname> or strip out the root out of myFs's
+     *  inital wd. 
+     * Only reasonable choice for initialWd for chrooted fds is null 
+     * so that the default rule for wd is applied
+     */
+    return null;
+  }
+  
+  public Path getResolvedQualifiedPath(final Path f)
+      throws FileNotFoundException {
+    return myFs.makeQualified(
+        new Path(chRootPathPartString + f.toUri().toString()));
+  }
+  
+  @Override
+  public Path getHomeDirectory() {
+    return  new Path("/user/"+System.getProperty("user.name")).makeQualified(
+          getUri(), null);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+  
+  @Override
+  public void setWorkingDirectory(final Path new_dir) {
+    workingDir = new_dir.isAbsolute() ? new_dir : new Path(workingDir, new_dir);
+  }
+
+  @Override
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+      final boolean overwrite, final int bufferSize, final short replication,
+      final long blockSize, final Progressable progress) throws IOException {
+    return myFs.create(fullPath(f), permission, overwrite, bufferSize,
+        replication, blockSize, progress);
+  }
+
+  @Override
+  public boolean delete(final Path f, final boolean recursive) 
+      throws IOException {
+    return myFs.delete(fullPath(f), recursive);
+  }
+  
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public boolean delete(Path f) throws IOException {
+   return delete(f, true);
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final FileStatus fs, final long start,
+      final long len) throws IOException {
+    return myFs.getFileBlockLocations(
+        new ViewFsFileStatus(fs, fullPath(fs.getPath())), start, len);
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(final Path f) 
+      throws IOException {
+    return myFs.getFileChecksum(fullPath(f));
+  }
+
+  @Override
+  public FileStatus getFileStatus(final Path f) 
+      throws IOException {
+    return myFs.getFileStatus(fullPath(f));
+  }
+
+  @Override
+  public FsStatus getStatus(Path p) throws IOException {
+    return myFs.getStatus(fullPath(p));
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return myFs.getServerDefaults();
+  }
+
+  @Override
+  public FileStatus[] listStatus(final Path f) 
+      throws IOException {
+    return myFs.listStatus(fullPath(f));
+  }
+  
+  @Override
+  public boolean mkdirs(final Path f, final FsPermission permission)
+      throws IOException {
+    return myFs.mkdirs(fullPath(f), permission);
+  }
+
+  @Override
+  public FSDataInputStream open(final Path f, final int bufferSize) 
+    throws IOException {
+    return myFs.open(fullPath(f), bufferSize);
+  }
+  
+  @Override
+  public FSDataOutputStream append(final Path f, final int bufferSize,
+      final Progressable progress) throws IOException {
+    return myFs.append(fullPath(f), bufferSize, progress);
+  }
+
+  @Override
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    // note fullPath will check that paths are relative to this FileSystem.
+    // Hence both are in same file system and a rename is valid
+    return myFs.rename(fullPath(src), fullPath(dst)); 
+  }
+  
+  @Override
+  public void setOwner(final Path f, final String username,
+      final String groupname)
+    throws IOException {
+    myFs.setOwner(fullPath(f), username, groupname);
+  }
+
+  @Override
+  public void setPermission(final Path f, final FsPermission permission)
+    throws IOException {
+    myFs.setPermission(fullPath(f), permission);
+  }
+
+  @Override
+  public boolean setReplication(final Path f, final short replication)
+    throws IOException {
+    return myFs.setReplication(fullPath(f), replication);
+  }
+
+  @Override
+  public void setTimes(final Path f, final long mtime, final long atime) 
+      throws IOException {
+    myFs.setTimes(fullPath(f), mtime, atime);
+  }
+
+  @Override
+  public void setVerifyChecksum(final boolean verifyChecksum)  {
+    myFs.setVerifyChecksum(verifyChecksum);
+  }
+  
+  @Override
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    return myFs.getDelegationTokens(renewer);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java?rev=1100026&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java Fri May  6 02:11:31 2011
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <code>ChrootedFs</code> is a file system with its root some path
+ * below the root of its base file system.
+ * Example: For a base file system hdfs://nn1/ with chRoot at /usr/foo, the
+ * members will be setup as shown below.
+ * <ul>
+ * <li>myFs is the base file system and points to hdfs at nn1</li>
+ * <li>myURI is hdfs://nn1/user/foo</li>
+ * <li>chRootPathPart is /user/foo</li>
+ * <li>workingDir is a directory related to chRoot</li>
+ * </ul>
+ * 
+ * The paths are resolved as follows by ChRootedFileSystem:
+ * <ul>
+ * <li> Absolute path /a/b/c is resolved to /user/foo/a/b/c at myFs</li>
+ * <li> Relative path x/y is resolved to /user/foo/<workingDir>/x/y</li>
+ * </ul>
+
+ * 
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+class ChRootedFs extends AbstractFileSystem {
+  private final AbstractFileSystem myFs;  // the base file system whose root is changed
+  private final URI myUri; // the base URI + the chroot
+  private final Path chRootPathPart; // the root below the root of the base
+  private final String chRootPathPartString;
+  
+  protected AbstractFileSystem getMyFs() {
+    return myFs;
+  }
+  
+  /**
+   * 
+   * @param path
+   * @return return full path including the chroot
+   */
+  protected Path fullPath(final Path path) {
+    super.checkPath(path);
+    return new Path(chRootPathPartString + path.toUri().getPath());
+  }
+  
+  public ChRootedFs(final AbstractFileSystem fs, final Path theRoot)
+    throws URISyntaxException {
+    super(fs.getUri(), fs.getUri().getScheme(),
+        fs.getUri().getAuthority() != null, fs.getUriDefaultPort());
+    myFs = fs;
+    myFs.checkPath(theRoot);
+    chRootPathPart = new Path(myFs.getUriPath(theRoot));
+    chRootPathPartString = chRootPathPart.toUri().getPath();
+    /*
+     * We are making URI include the chrootedPath: e.g. file:///chrootedPath.
+     * This is questionable since Path#makeQualified(uri, path) ignores
+     * the pathPart of a uri. Since this class is internal we can ignore
+     * this issue but if we were to make it external then this needs
+     * to be resolved.
+     */
+    // Handle the two cases:
+    //              scheme:/// and scheme://authority/
+    myUri = new URI(myFs.getUri().toString() + 
+        (myFs.getUri().getAuthority() == null ? "" :  Path.SEPARATOR) +
+          chRootPathPart.toString().substring(1));
+    super.checkPath(theRoot);
+  }
+  
+  @Override
+  public URI getUri() {
+    return myUri;
+  }
+
+  
+  /**
+   *  
+   * Strip out the root from the path.
+   * 
+   * @param p - fully qualified path p
+   * @return -  the remaining path  without the begining /
+   */
+  public String stripOutRoot(final Path p) {
+    try {
+     checkPath(p);
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException("Internal Error - path " + p +
+          " should have been with URI" + myUri);
+    }
+    String pathPart = p.toUri().getPath();
+    return  (pathPart.length() == chRootPathPartString.length()) ?
+        "" : pathPart.substring(chRootPathPartString.length() + 1);   
+  }
+  
+
+  @Override
+  public Path getHomeDirectory() {
+    return myFs.getHomeDirectory();
+  }
+  
+  @Override
+  public Path getInitialWorkingDirectory() {
+    /*
+     * 3 choices here: return null or / or strip out the root out of myFs's
+     *  inital wd. 
+     * Only reasonable choice for initialWd for chrooted fds is null 
+     */
+    return null;
+  }
+  
+  
+  public Path getResolvedQualifiedPath(final Path f)
+      throws FileNotFoundException {
+    return myFs.makeQualified(
+        new Path(chRootPathPartString + f.toUri().toString()));
+  }
+  
+  @Override
+  public FSDataOutputStream createInternal(final Path f,
+      final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress, final int bytesPerChecksum,
+      final boolean createParent) throws IOException, UnresolvedLinkException {
+    return myFs.createInternal(fullPath(f), flag,
+        absolutePermission, bufferSize,
+        replication, blockSize, progress, bytesPerChecksum, createParent);
+  }
+
+  @Override
+  public boolean delete(final Path f, final boolean recursive) 
+      throws IOException, UnresolvedLinkException {
+    return myFs.delete(fullPath(f), recursive);
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final Path f, final long start,
+      final long len) throws IOException, UnresolvedLinkException {
+    return myFs.getFileBlockLocations(fullPath(f), start, len);
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(final Path f) 
+      throws IOException, UnresolvedLinkException {
+    return myFs.getFileChecksum(fullPath(f));
+  }
+
+  @Override
+  public FileStatus getFileStatus(final Path f) 
+      throws IOException, UnresolvedLinkException {
+    return myFs.getFileStatus(fullPath(f));
+  }
+
+  @Override
+  public FileStatus getFileLinkStatus(final Path f) 
+    throws IOException, UnresolvedLinkException {
+    return myFs.getFileLinkStatus(fullPath(f));
+  }
+  
+  @Override
+  public FsStatus getFsStatus() throws IOException {
+    return myFs.getFsStatus();
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    return myFs.getServerDefaults();
+  }
+
+  @Override
+  public int getUriDefaultPort() {
+    return myFs.getUriDefaultPort();
+  }
+
+  @Override
+  public FileStatus[] listStatus(final Path f) 
+      throws IOException, UnresolvedLinkException {
+    return myFs.listStatus(fullPath(f));
+  }
+
+  @Override
+  public void mkdir(final Path dir, final FsPermission permission,
+      final boolean createParent) throws IOException, UnresolvedLinkException {
+    myFs.mkdir(fullPath(dir), permission, createParent);
+    
+  }
+
+  @Override
+  public FSDataInputStream open(final Path f, final int bufferSize) 
+    throws IOException, UnresolvedLinkException {
+    return myFs.open(fullPath(f), bufferSize);
+  }
+
+  @Override
+  public void renameInternal(final Path src, final Path dst)
+    throws IOException, UnresolvedLinkException {
+    // note fullPath will check that paths are relative to this FileSystem.
+    // Hence both are in same file system and a rename is valid
+    myFs.renameInternal(fullPath(src), fullPath(dst));
+  }
+  
+  @Override
+  public void renameInternal(final Path src, final Path dst, 
+      final boolean overwrite)
+    throws IOException, UnresolvedLinkException {
+    // note fullPath will check that paths are relative to this FileSystem.
+    // Hence both are in same file system and a rename is valid
+    myFs.renameInternal(fullPath(src), fullPath(dst), overwrite);
+  }
+
+  @Override
+  public void setOwner(final Path f, final String username,
+      final String groupname)
+    throws IOException, UnresolvedLinkException {
+    myFs.setOwner(fullPath(f), username, groupname);
+    
+  }
+
+  @Override
+  public void setPermission(final Path f, final FsPermission permission)
+    throws IOException, UnresolvedLinkException {
+    myFs.setPermission(fullPath(f), permission);
+  }
+
+  @Override
+  public boolean setReplication(final Path f, final short replication)
+    throws IOException, UnresolvedLinkException {
+    return myFs.setReplication(fullPath(f), replication);
+  }
+
+  @Override
+  public void setTimes(final Path f, final long mtime, final long atime) 
+      throws IOException, UnresolvedLinkException {
+    myFs.setTimes(fullPath(f), mtime, atime);
+  }
+
+  @Override
+  public void setVerifyChecksum(final boolean verifyChecksum) 
+      throws IOException, UnresolvedLinkException {
+    myFs.setVerifyChecksum(verifyChecksum);
+  }
+
+  @Override
+  public boolean supportsSymlinks() {
+    return myFs.supportsSymlinks();
+  }
+
+  @Override
+  public void createSymlink(final Path target, final Path link,
+      final boolean createParent) throws IOException, UnresolvedLinkException {
+    /*
+     * We leave the link alone:
+     * If qualified or link relative then of course it is okay.
+     * If absolute (ie / relative) then the link has to be resolved
+     * relative to the changed root.
+     */
+    myFs.createSymlink(fullPath(target), link, createParent);
+  }
+
+  @Override
+  public Path getLinkTarget(final Path f) throws IOException {
+    return myFs.getLinkTarget(fullPath(f));
+  }
+  
+  
+  @Override
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    return myFs.getDelegationTokens(renewer);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java?rev=1100026&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java Fri May  6 02:11:31 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utilities for config variables of the viewFs See {@link ViewFs}
+ */
+public class ConfigUtil {
+  /**
+   * Get the config variable prefix for the specified mount table
+   * @param mountTableName - the name of the mount table
+   * @return the config variable prefix for the specified mount table
+   */
+  public static String getConfigViewFsPrefix(final String mountTableName) {
+    return Constants.CONFIG_VIEWFS_PREFIX + "." + mountTableName;
+  }
+  
+  /**
+   * Get the config variable prefix for the default mount table
+   * @return the config variable prefix for the default mount table
+   */
+  public static String getConfigViewFsPrefix() {
+    return 
+      getConfigViewFsPrefix(Constants.CONFIG_VIEWFS_PREFIX_DEFAULT_MOUNT_TABLE);
+  }
+  
+  /**
+   * Add a link to the config for the specified mount table
+   * @param conf - add the link to this conf
+   * @param mountTableName
+   * @param src - the src path name
+   * @param target - the target URI link
+   */
+  public static void addLink(Configuration conf, final String mountTableName, 
+      final String src, final URI target) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK + "." + src, target.toString());  
+  }
+  
+  /**
+   * Add a link to the config for the default mount table
+   * @param conf - add the link to this conf
+   * @param src - the src path name
+   * @param target - the target URI link
+   */
+  public static void addLink(final Configuration conf, final String src,
+      final URI target) {
+    addLink( conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, 
+        src, target);   
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/Constants.java?rev=1100026&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/Constants.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/Constants.java Fri May  6 02:11:31 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * Config variable prefixes for ViewFs -
+ *     see {@link org.apache.hadoop.fs.viewfs.ViewFs} for examples.
+ * The mount table is specified in the config using these prefixes.
+ * See {@link org.apache.hadoop.fs.viewfs.ConfigUtil} for convenience lib.
+ */
+public interface Constants {
+  /**
+   * Prefix for the config variable prefix for the ViewFs mount-table
+   */
+  public static final String CONFIG_VIEWFS_PREFIX = "fs.viewfs.mounttable";
+  
+  /**
+   * Config variable name for the default mount table.
+   */
+  public static final String CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE = "default";
+  
+  /**
+   * Config variable full prefix for the default mount table.
+   */
+  public static final String CONFIG_VIEWFS_PREFIX_DEFAULT_MOUNT_TABLE = 
+          CONFIG_VIEWFS_PREFIX + "." + CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
+  
+  /**
+   * Config variable for specifying a simple link
+   */
+  public static final String CONFIG_VIEWFS_LINK = "link";
+  
+  /**
+   * Config variable for specifying a merge link
+   */
+  public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
+  
+  /**
+   * Config variable for specifying a merge of the root of the mount-table
+   *  with the root of another file system. 
+   */
+  public static final String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
+
+  static public final FsPermission PERMISSION_RRR = 
+    new FsPermission((short) 0444);
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/InodeTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/InodeTree.java?rev=1100026&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/InodeTree.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/InodeTree.java Fri May  6 02:11:31 2011
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+
+/**
+ * InodeTree implements a mount-table as a tree of inodes.
+ * It is used to implement ViewFs and ViewFileSystem.
+ * In order to use it the caller must subclass it and implement
+ * the abstract methods {@link #getTargetFileSystem(INodeDir)}, etc.
+ * 
+ * The mountable is initialized from the config variables as 
+ * specified in {@link ViewFs}
+ *
+ * @param <T> is AbstractFileSystem or FileSystem
+ * 
+ * The three main methods are
+ * {@link #InodeTreel(Configuration)} // constructor
+ * {@link #InodeTree(Configuration, String)} // constructor
+ * {@link #resolve(String, boolean)} 
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable 
+abstract class InodeTree<T> {
+  static enum ResultKind {isInternalDir, isExternalDir;};
+  static final Path SlashPath = new Path("/");
+  
+  final INodeDir<T> root; // the root of the mount table
+  
+  List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
+  
+  
+  static class MountPoint<T> {
+    String src;
+    INodeLink<T> target;
+    MountPoint(String srcPath, INodeLink<T> mountLink) {
+      src = srcPath;
+      target = mountLink;
+    }
+
+  }
+  
+  /**
+   * Breaks file path into component names.
+   * @param path
+   * @return array of names component names
+   */
+  static String[] breakIntoPathComponents(final String path) {
+    return path == null ? null : path.split(Path.SEPARATOR);
+  } 
+  
+  /**
+   * Internal class for inode tree
+   * @param <T>
+   */
+  abstract static class INode<T> {
+    final String fullPath; // the full path to the root
+    public INode(String pathToNode, UserGroupInformation aUgi) {
+      fullPath = pathToNode;
+    }
+  };
+
+  /**
+   * Internal class to represent an internal dir of the mount table
+   * @param <T>
+   */
+  static class INodeDir<T> extends INode<T> {
+    final Map<String,INode<T>> children = new HashMap<String,INode<T>>();
+    T InodeDirFs =  null; // file system of this internal directory of mountT
+    boolean isRoot = false;
+    
+    INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
+      super(pathToNode, aUgi);
+    }
+
+    INode<T> resolve(final String pathComponent) throws FileNotFoundException {
+      final INode<T> result = resolveInternal(pathComponent);
+      if (result == null) {
+        throw new FileNotFoundException();
+      }
+      return result;
+    }
+    
+    INode<T> resolveInternal(final String pathComponent)
+        throws FileNotFoundException {
+      return children.get(pathComponent);
+    }
+    
+    INodeDir<T> addDir(final String pathComponent,
+        final UserGroupInformation aUgi)
+      throws FileAlreadyExistsException {
+      if (children.containsKey(pathComponent)) {
+        throw new FileAlreadyExistsException();
+      }
+      final INodeDir<T> newDir = new INodeDir<T>(fullPath+ (isRoot ? "" : "/") + 
+          pathComponent, aUgi);
+      children.put(pathComponent, newDir);
+      return newDir;
+    }
+    
+    void addLink(final String pathComponent, final INodeLink<T> link)
+      throws FileAlreadyExistsException {
+      if (children.containsKey(pathComponent)) {
+        throw new FileAlreadyExistsException();
+      }
+      children.put(pathComponent, link);
+    }
+  }
+
+  /**
+   * In internal class to represent a mount link
+   * A mount link can be single dir link or a merge dir link.
+
+   * A merge dir link is  a merge (junction) of links to dirs:
+   * example : <merge of 2 dirs
+   *     /users -> hdfs:nn1//users
+   *     /users -> hdfs:nn2//users
+   * 
+   * For a merge, each target is checked to be dir when created but if target
+   * is changed later it is then ignored (a dir with null entries)
+   */
+  static class INodeLink<T> extends INode<T> {
+    final boolean isMergeLink; // true if MergeLink
+    final URI[] targetDirLinkList;
+    final T targetFileSystem;   // file system object created from the link.
+    
+    /**
+     * Construct a mergeLink
+     */
+    INodeLink(final String pathToNode, final UserGroupInformation aUgi,
+        final T targetMergeFs, final URI[] aTargetDirLinkList) {
+      super(pathToNode, aUgi);
+      targetFileSystem = targetMergeFs;
+      targetDirLinkList = aTargetDirLinkList;
+      isMergeLink = true;
+    }
+    
+    /**
+     * Construct a simple link (i.e. not a mergeLink)
+     */
+    INodeLink(final String pathToNode, final UserGroupInformation aUgi,
+        final T targetFs, final URI aTargetDirLink) {
+      super(pathToNode, aUgi);
+      targetFileSystem = targetFs;
+      targetDirLinkList = new URI[1];
+      targetDirLinkList[0] = aTargetDirLink;
+      isMergeLink = false;
+    }
+    
+    /**
+     * Get the target of the link
+     * If a merge link then it returned as "," separated URI list.
+     */
+    Path getTargetLink() {
+      // is merge link - use "," as separator between the merged URIs
+      //String result = targetDirLinkList[0].toString();
+      StringBuilder result = new StringBuilder(targetDirLinkList[0].toString());
+      for (int i=1; i < targetDirLinkList.length; ++i) { 
+        result.append(',').append(targetDirLinkList[i].toString());
+      }
+      return new Path(result.toString());
+    }
+  }
+
+
+  private void createLink(final String src, final String target,
+      final boolean isLinkMerge, final UserGroupInformation aUgi)
+      throws URISyntaxException, IOException,
+    FileAlreadyExistsException, UnsupportedFileSystemException {
+    // Validate that src is valid absolute path
+    final Path srcPath = new Path(src); 
+    if (!srcPath.isAbsoluteAndSchemeAuthorityNull()) {
+      throw new IOException("ViewFs:Non absolute mount name in config:" + src);
+    }
+ 
+    final String[] srcPaths = breakIntoPathComponents(src);
+    INodeDir<T> curInode = root;
+    int i;
+    // Ignore first initial slash, process all except last component
+    for (i = 1; i < srcPaths.length-1; i++) {
+      final String iPath = srcPaths[i];
+      INode<T> nextInode = curInode.resolveInternal(iPath);
+      if (nextInode == null) {
+        INodeDir<T> newDir = curInode.addDir(iPath, aUgi);
+        newDir.InodeDirFs = getTargetFileSystem(newDir);
+        nextInode = newDir;
+      }
+      if (nextInode instanceof INodeLink) {
+        // Error - expected a dir but got a link
+        throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
+            " already exists as link");
+      } else {
+        assert(nextInode instanceof INodeDir);
+        curInode = (INodeDir<T>) nextInode;
+      }
+    }
+    
+    // Now process the last component
+    // Add the link in 2 cases: does not exist or a link exists
+    String iPath = srcPaths[i];// last component
+    if (curInode.resolveInternal(iPath) != null) {
+      //  directory/link already exists
+      StringBuilder strB = new StringBuilder(srcPaths[0]);
+      for (int j = 1; j <= i; ++j) {
+        strB.append('/').append(srcPaths[j]);
+      }
+      throw new FileAlreadyExistsException("Path " + strB +
+            " already exists as dir; cannot create link here");
+    }
+    
+    final INodeLink<T> newLink;
+    final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
+        + iPath;
+    if (isLinkMerge) { // Target is list of URIs
+      String[] targetsList = StringUtils.getStrings(target);
+      URI[] targetsListURI = new URI[targetsList.length];
+      int k = 0;
+      for (String itarget : targetsList) {
+        targetsListURI[k++] = new URI(itarget);
+      }
+      newLink = new INodeLink<T>(fullPath, aUgi,
+          getTargetFileSystem(targetsListURI), targetsListURI);
+    } else {
+      newLink = new INodeLink<T>(fullPath, aUgi,
+          getTargetFileSystem(new URI(target)), new URI(target));
+    }
+    curInode.addLink(iPath, newLink);
+    mountPoints.add(new MountPoint<T>(src, newLink));
+  }
+  
+  /**
+   * Below the "public" methods of InodeTree
+   */
+  
+  /**
+   * The user of this class must subclass and implement the following
+   * 3 abstract methods.
+   * @throws IOException 
+   */
+  protected abstract T getTargetFileSystem(final URI uri)
+    throws UnsupportedFileSystemException, URISyntaxException, IOException;
+  
+  protected abstract T getTargetFileSystem(final INodeDir<T> dir)
+    throws URISyntaxException;
+  
+  protected abstract T getTargetFileSystem(final URI[] mergeFsURIList)
+  throws UnsupportedFileSystemException, URISyntaxException;
+  
+  /**
+   * Create Inode Tree from the specified mount-table specified in Config
+   * @param config - the mount table keys are prefixed with 
+   *       FsConstants.CONFIG_VIEWFS_PREFIX
+   * @param viewName - the name of the mount table - if null use defaultMT name
+   * @throws UnsupportedFileSystemException
+   * @throws URISyntaxException
+   * @throws FileAlreadyExistsException
+   * @throws IOException
+   */
+  protected InodeTree(final Configuration config, final String viewName)
+      throws UnsupportedFileSystemException, URISyntaxException,
+    FileAlreadyExistsException, IOException { 
+    String vName = viewName;
+    if (vName == null) {
+      vName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
+    }
+    root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
+    root.InodeDirFs = getTargetFileSystem(root);
+    root.isRoot = true;
+    
+    final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." + 
+                            vName + ".";
+    final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
+    final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
+    boolean gotMountTableEntry = false;
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    for (Entry<String, String> si : config) {
+      final String key = si.getKey();
+      if (key.startsWith(mtPrefix)) {
+        gotMountTableEntry = true;
+        boolean isMergeLink = false;
+        String src = key.substring(mtPrefix.length());
+        if (src.startsWith(linkPrefix)) {
+          src = src.substring(linkPrefix.length());
+        } else if (src.startsWith(linkMergePrefix)) { // A merge link
+          isMergeLink = true;
+          src = src.substring(linkMergePrefix.length());
+        } else {
+          throw new IOException(
+          "ViewFs: Cannot initialize: Invalid entry in Mount table in config: "+ 
+          src);
+        }
+        final String target = si.getValue(); // link or merge link
+        createLink(src, target, isMergeLink, ugi); 
+      }
+    }
+    if (!gotMountTableEntry) {
+      throw new IOException(
+          "ViewFs: Cannot initialize: Empty Mount table in config for " + 
+             vName == null ? "viewfs:///" : ("viewfs://" + vName + "/"));
+    }
+  }
+
+  /**
+   * Resolve returns ResolveResult.
+   * The caller can continue the resolution of the remainingPath
+   * in the targetFileSystem.
+   * 
+   * If the input pathname leads to link to another file system then
+   * the targetFileSystem is the one denoted by the link (except it is
+   * file system chrooted to link target.
+   * If the input pathname leads to an internal mount-table entry then
+   * the target file system is one that represents the internal inode.
+   */
+  static class ResolveResult<T> {
+    final ResultKind kind;
+    final T targetFileSystem;
+    final String resolvedPath;
+    final Path remainingPath;   // to resolve in the target FileSystem
+    
+    ResolveResult(final ResultKind k, final T targetFs, final String resolveP,
+        final Path remainingP) {
+      kind = k;
+      targetFileSystem = targetFs;
+      resolvedPath = resolveP;
+      remainingPath = remainingP;  
+    }
+    
+    // isInternalDir of path resolution completed within the mount table 
+    boolean isInternalDir() {
+      return (kind == ResultKind.isInternalDir);
+    }
+  }
+  
+  /**
+   * Resolve the pathname p relative to root InodeDir
+   * @param p - inout path
+   * @param resolveLastComponent 
+   * @return ResolveResult which allows further resolution of the remaining path
+   * @throws FileNotFoundException
+   */
+  ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
+    throws FileNotFoundException {
+    // TO DO: - more efficient to not split the path, but simply compare
+    String[] path = breakIntoPathComponents(p); 
+    if (path.length <= 1) { // special case for when path is "/"
+      ResolveResult<T> res = 
+        new ResolveResult<T>(ResultKind.isInternalDir, 
+              root.InodeDirFs, root.fullPath, SlashPath);
+      return res;
+    }
+    
+    INodeDir<T> curInode = root;
+    int i;
+    // ignore first slash
+    for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
+      INode<T> nextInode = curInode.resolveInternal(path[i]);
+      if (nextInode == null) {
+        StringBuilder failedAt = new StringBuilder(path[0]);
+        for ( int j = 1; j <=i; ++j) {
+          failedAt.append('/').append(path[j]);
+        }
+        throw (new FileNotFoundException(failedAt.toString()));      
+      }
+
+      if (nextInode instanceof INodeLink) {
+        final INodeLink<T> link = (INodeLink<T>) nextInode;
+        final Path remainingPath;
+        if (i >= path.length-1) {
+          remainingPath = SlashPath;
+        } else {
+          StringBuilder remainingPathStr = new StringBuilder("/" + path[i+1]);
+          for (int j = i+2; j< path.length; ++j) {
+            remainingPathStr.append('/').append(path[j]);
+          }
+          remainingPath = new Path(remainingPathStr.toString());
+        }
+        final ResolveResult<T> res = 
+          new ResolveResult<T>(ResultKind.isExternalDir,
+              link.targetFileSystem, nextInode.fullPath, remainingPath);
+        return res;
+      } else if (nextInode instanceof INodeDir) {
+        curInode = (INodeDir<T>) nextInode;
+      }
+    }
+
+    // We have resolved to an internal dir in mount table.
+    Path remainingPath;
+    if (resolveLastComponent) {
+      remainingPath = SlashPath;
+    } else {
+      // note we have taken care of when path is "/" above
+      // for internal dirs rem-path does not start with / since the lookup
+      // that follows will do a children.get(remaningPath) and will have to
+      // strip-out the initial /
+      StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
+      for (int j = i+1; j< path.length; ++j) {
+        remainingPathStr.append('/').append(path[j]);
+      }
+      remainingPath = new Path(remainingPathStr.toString());
+    }
+    final ResolveResult<T> res = 
+       new ResolveResult<T>(ResultKind.isInternalDir,
+           curInode.InodeDirFs, curInode.fullPath, remainingPath); 
+    return res;
+  }
+  
+  List<MountPoint<T>> getMountPoints() { 
+    return mountPoints;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java?rev=1100026&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java Fri May  6 02:11:31 2011
@@ -0,0 +1,685 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_RRR;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.viewfs.InodeTree.INode;
+import org.apache.hadoop.fs.viewfs.InodeTree.INodeLink;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * ViewFileSystem (extends the FileSystem interface) implements a client-side
+ * mount table. Its spec and implementation is identical to {@link ViewFs}.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public class ViewFileSystem extends FileSystem {
+  static AccessControlException readOnlyMountTable(final String operation,
+      final String p) {
+    return new AccessControlException( 
+        "InternalDir of ViewFileSystem is readonly; operation=" + operation + 
+        "Path=" + p);
+  }
+  static AccessControlException readOnlyMountTable(final String operation,
+      final Path p) {
+    return readOnlyMountTable(operation, p.toString());
+  }
+  
+  static public class MountPoint {
+    private Path src;       // the src of the mount
+    private URI[] targets; //  target of the mount; Multiple targets imply mergeMount
+    MountPoint(Path srcPath, URI[] targetURIs) {
+      src = srcPath;
+      targets = targetURIs;
+    }
+    Path getSrc() {
+      return src;
+    }
+    URI[] getTargets() {
+      return targets;
+    }
+  }
+  
+  final long creationTime; // of the the mount table
+  final UserGroupInformation ugi; // the user/group of user who created mtable
+  URI myUri;
+  private Path workingDir;
+  Configuration config;
+  InodeTree<FileSystem> fsState;  // the fs state; ie the mount table
+  
+  /**
+   * Prohibits names which contain a ".", "..", ":" or "/" 
+   */
+  private static boolean isValidName(final String src) {
+    // Check for ".." "." ":" "/"
+    final StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
+    while(tokens.hasMoreTokens()) {
+      String element = tokens.nextToken();
+      if (element.equals("..") || 
+          element.equals(".")  ||
+          (element.indexOf(":") >= 0)) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /**
+   * Make the path Absolute and get the path-part of a pathname.
+   * Checks that URI matches this file system 
+   * and that the path-part is a valid name.
+   * 
+   * @param p path
+   * @return path-part of the Path p
+   */
+  private String getUriPath(final Path p) {
+    checkPath(p);
+    String s = makeAbsolute(p).toUri().getPath();
+    if (!isValidName(s)) {
+      throw new InvalidPathException("Path part " + s + " from URI" + p
+          + " is not a valid filename.");
+    }
+    return s;
+  }
+  
+  private Path makeAbsolute(final Path f) {
+    return f.isAbsolute() ? f : new Path(workingDir, f);
+  }
+  
+  /**
+   * This is the  constructor with the signature needed by
+   * {@link FileSystem#createFileSystem(URI, Configuration)}
+   * 
+   * After this constructor is called initialize() is called.
+   * @throws IOException 
+   */
+  public ViewFileSystem() throws IOException {
+    ugi = UserGroupInformation.getCurrentUser();
+    creationTime = System.currentTimeMillis();
+  }
+
+  /**
+   * Called after a new FileSystem instance is constructed.
+   * @param theUri a uri whose authority section names the host, port, etc. for
+   *          this FileSystem
+   * @param conf the configuration
+   */
+  public void initialize(final URI theUri, final Configuration conf)
+      throws IOException {
+    super.initialize(theUri, conf);
+    setConf(conf);
+    config = conf;
+    // Now build  client side view (i.e. client side mount table) from config.
+    final String authority = theUri.getAuthority();
+    try {
+      myUri = new URI(FsConstants.VIEWFS_SCHEME, authority, "/", null, null);
+      workingDir =
+        this.makeQualified(new Path("/user/" + ugi.getShortUserName()));
+      fsState = new InodeTree<FileSystem>(conf, authority) {
+
+        @Override
+        protected
+        FileSystem getTargetFileSystem(final URI uri)
+          throws URISyntaxException, IOException {
+            return new ChRootedFileSystem(FileSystem.get(uri, config), 
+                new Path(uri.getPath()));
+        }
+
+        @Override
+        protected
+        FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
+          throws URISyntaxException {
+          return new InternalDirOfViewFs(dir, creationTime, ugi, myUri);
+        }
+
+        @Override
+        protected
+        FileSystem getTargetFileSystem(URI[] mergeFsURIList)
+            throws URISyntaxException, UnsupportedFileSystemException {
+          throw new UnsupportedFileSystemException("mergefs not implemented");
+          // return MergeFs.createMergeFs(mergeFsURIList, config);
+        }
+      };
+    } catch (URISyntaxException e) {
+      throw new IOException("URISyntax exception: " + theUri);
+    }
+
+  }
+  
+  
+  /**
+   * Convenience Constructor for apps to call directly
+   * @param theUri which must be that of ViewFileSystem
+   * @param conf
+   * @throws IOException
+   */
+  ViewFileSystem(final URI theUri, final Configuration conf)
+    throws IOException {
+    this();
+    initialize(theUri, conf);
+  }
+  
+  /**
+   * Convenience Constructor for apps to call directly
+   * @param conf
+   * @throws IOException
+   */
+  public ViewFileSystem(final Configuration conf) throws IOException {
+    this(FsConstants.VIEWFS_URI, conf);
+  }
+  
+  public Path getTrashCanLocation(final Path f) throws FileNotFoundException {
+    final InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    return res.isInternalDir() ? null : res.targetFileSystem.getHomeDirectory();
+  }
+
+  @Override
+  public URI getUri() {
+    return myUri;
+  }
+  
+  @Override
+  public Path resolvePath(final Path f)
+      throws IOException {
+    final InodeTree.ResolveResult<FileSystem> res;
+      res = fsState.resolve(getUriPath(f), true);
+    if (res.isInternalDir()) {
+      return f;
+    }
+    return res.targetFileSystem.resolvePath(res.remainingPath);
+  }
+  
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public void setWorkingDirectory(final Path new_dir) {
+    getUriPath(new_dir); // this validates the path
+    workingDir = makeAbsolute(new_dir);
+  }
+  
+  @Override
+  public FSDataOutputStream append(final Path f, final int bufferSize,
+      final Progressable progress) throws IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    return res.targetFileSystem.append(res.remainingPath, bufferSize, progress);
+  }
+  
+  @Override
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+      final boolean overwrite, final int bufferSize, final short replication,
+      final long blockSize, final Progressable progress) throws IOException {
+    InodeTree.ResolveResult<FileSystem> res;
+    try {
+      res = fsState.resolve(getUriPath(f), false);
+    } catch (FileNotFoundException e) {
+        throw readOnlyMountTable("create", f);
+    }
+    assert(res.remainingPath != null);
+    return res.targetFileSystem.create(res.remainingPath, permission,
+         overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+  
+  @Override
+  public boolean delete(final Path f, final boolean recursive)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    // If internal dir or target is a mount link (ie remainingPath is Slash)
+    if (res.isInternalDir() || res.remainingPath == InodeTree.SlashPath) {
+      throw readOnlyMountTable("delete", f);
+    }
+    return res.targetFileSystem.delete(res.remainingPath, recursive);
+  }
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public boolean delete(final Path f)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+      return delete(f, true);
+  }
+  
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus fs, 
+      long start, long len) throws IOException {
+    final InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(fs.getPath()), true);
+    return res.targetFileSystem.getFileBlockLocations(
+          new ViewFsFileStatus(fs, res.remainingPath), start, len);
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(final Path f)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    return res.targetFileSystem.getFileChecksum(f);
+  }
+
+  @Override
+  public FileStatus getFileStatus(final Path f) throws AccessControlException,
+      FileNotFoundException, IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    
+    // FileStatus#getPath is a fully qualified path relative to the root of 
+    // target file system.
+    // We need to change it to viewfs URI - relative to root of mount table.
+    
+    // The implementors of RawLocalFileSystem were trying to be very smart.
+    // They implement FileStatus#getOwener lazily -- the object
+    // returned is really a RawLocalFileSystem that expect the
+    // FileStatus#getPath to be unchanged so that it can get owner when needed.
+    // Hence we need to interpose a new ViewFileSystemFileStatus that 
+    // works around.
+    FileStatus status =  res.targetFileSystem.getFileStatus(res.remainingPath);
+    return new ViewFsFileStatus(status, this.makeQualified(f));
+  }
+  
+  
+  @Override
+  public FileStatus[] listStatus(final Path f) throws AccessControlException,
+      FileNotFoundException, IOException {
+    InodeTree.ResolveResult<FileSystem> res =
+      fsState.resolve(getUriPath(f), true);
+    
+    FileStatus[] statusLst = res.targetFileSystem.listStatus(res.remainingPath);
+    if (!res.isInternalDir()) {
+      // We need to change the name in the FileStatus as described in
+      // {@link #getFileStatus }
+      ChRootedFileSystem targetFs;
+      targetFs = (ChRootedFileSystem) res.targetFileSystem;
+      int i = 0;
+      for (FileStatus status : statusLst) {
+          String suffix = targetFs.stripOutRoot(status.getPath());
+          statusLst[i++] = new ViewFsFileStatus(status, this.makeQualified(
+              suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix)));
+      }
+    }
+    return statusLst;
+  }
+
+  @Override
+  public boolean mkdirs(final Path dir, final FsPermission permission)
+      throws IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(dir), false);
+   return  res.targetFileSystem.mkdirs(res.remainingPath, permission);
+  }
+
+  @Override
+  public FSDataInputStream open(final Path f, final int bufferSize)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+        fsState.resolve(getUriPath(f), true);
+    return res.targetFileSystem.open(res.remainingPath, bufferSize);
+  }
+
+  
+  @Override
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    // passing resolveLastComponet as false to catch renaming a mount point to 
+    // itself. We need to catch this as an internal operation and fail.
+    InodeTree.ResolveResult<FileSystem> resSrc = 
+      fsState.resolve(getUriPath(src), false); 
+  
+    if (resSrc.isInternalDir()) {
+      throw readOnlyMountTable("rename", src);
+    }
+      
+    InodeTree.ResolveResult<FileSystem> resDst = 
+      fsState.resolve(getUriPath(dst), false);
+    if (resDst.isInternalDir()) {
+          throw readOnlyMountTable("rename", dst);
+    }
+    /**
+    // Alternate 1: renames within same file system - valid but we disallow
+    // Alternate 2: (as described in next para - valid but we have disallowed it
+    //
+    // Note we compare the URIs. the URIs include the link targets. 
+    // hence we allow renames across mount links as long as the mount links
+    // point to the same target.
+    if (!resSrc.targetFileSystem.getUri().equals(
+              resDst.targetFileSystem.getUri())) {
+      throw new IOException("Renames across Mount points not supported");
+    }
+    */
+    
+    //
+    // Alternate 3 : renames ONLY within the the same mount links.
+    //
+    if (resSrc.targetFileSystem !=resDst.targetFileSystem) {
+      throw new IOException("Renames across Mount points not supported");
+    }
+    return resSrc.targetFileSystem.rename(resSrc.remainingPath,
+        resDst.remainingPath);
+  }
+  
+  @Override
+  public void setOwner(final Path f, final String username,
+      final String groupname) throws AccessControlException,
+      FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    res.targetFileSystem.setOwner(res.remainingPath, username, groupname); 
+  }
+
+  @Override
+  public void setPermission(final Path f, final FsPermission permission)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    res.targetFileSystem.setPermission(res.remainingPath, permission); 
+  }
+
+  @Override
+  public boolean setReplication(final Path f, final short replication)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    return res.targetFileSystem.setReplication(res.remainingPath, replication);
+  }
+
+  @Override
+  public void setTimes(final Path f, final long mtime, final long atime)
+      throws AccessControlException, FileNotFoundException,
+      IOException {
+    InodeTree.ResolveResult<FileSystem> res = 
+      fsState.resolve(getUriPath(f), true);
+    res.targetFileSystem.setTimes(res.remainingPath, mtime, atime); 
+  }
+
+  @Override
+  public void setVerifyChecksum(final boolean verifyChecksum) { 
+    // This is a file system level operations, however ViewFileSystem 
+    // points to many file systems. Noop for ViewFileSystem.
+  }
+  
+  public MountPoint[] getMountPoints() {
+    List<InodeTree.MountPoint<FileSystem>> mountPoints = 
+                  fsState.getMountPoints();
+    
+    MountPoint[] result = new MountPoint[mountPoints.size()];
+    for ( int i = 0; i < mountPoints.size(); ++i ) {
+      result[i] = new MountPoint(new Path(mountPoints.get(i).src), 
+                              mountPoints.get(i).target.targetDirLinkList);
+    }
+    return result;
+  }
+  
+ 
+  @Override
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    List<InodeTree.MountPoint<FileSystem>> mountPoints = 
+                fsState.getMountPoints();
+    int initialListSize  = 0;
+    for (InodeTree.MountPoint<FileSystem> im : mountPoints) {
+      initialListSize += im.target.targetDirLinkList.length; 
+    }
+    List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
+    for ( int i = 0; i < mountPoints.size(); ++i ) {
+      List<Token<?>> tokens = 
+        mountPoints.get(i).target.targetFileSystem.getDelegationTokens(renewer);
+      if (tokens != null) {
+        result.addAll(tokens);
+      }
+    }
+    return result;
+  }
+  
+  /*
+   * An instance of this class represents an internal dir of the viewFs 
+   * that is internal dir of the mount table.
+   * It is a read only mount tables and create, mkdir or delete operations
+   * are not allowed.
+   * If called on create or mkdir then this target is the parent of the
+   * directory in which one is trying to create or mkdir; hence
+   * in this case the path name passed in is the last component. 
+   * Otherwise this target is the end point of the path and hence
+   * the path name passed in is null. 
+   */
+  static class InternalDirOfViewFs extends FileSystem {
+    final InodeTree.INodeDir<FileSystem>  theInternalDir;
+    final long creationTime; // of the the mount table
+    final UserGroupInformation ugi; // the user/group of user who created mtable
+    final URI myUri;
+    
+    public InternalDirOfViewFs(final InodeTree.INodeDir<FileSystem> dir,
+        final long cTime, final UserGroupInformation ugi, URI uri)
+      throws URISyntaxException {
+      myUri = uri;
+      try {
+        initialize(myUri, new Configuration());
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot occur");
+      }
+      theInternalDir = dir;
+      creationTime = cTime;
+      this.ugi = ugi;
+    }
+
+    static private void checkPathIsSlash(final Path f) throws IOException {
+      if (f != InodeTree.SlashPath) {
+        throw new IOException (
+        "Internal implementation error: expected file name to be /" );
+      }
+    }
+    
+    @Override
+    public URI getUri() {
+      return myUri;
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      throw new RuntimeException (
+      "Internal impl error: getWorkingDir should not have been called" );
+    }
+
+    @Override
+    public void setWorkingDirectory(final Path new_dir) {
+      throw new RuntimeException (
+      "Internal impl error: getWorkingDir should not have been called" ); 
+    }
+
+    @Override
+    public FSDataOutputStream append(final Path f, final int bufferSize,
+        final Progressable progress) throws IOException {
+      throw readOnlyMountTable("append", f);
+    }
+
+    @Override
+    public FSDataOutputStream create(final Path f,
+        final FsPermission permission, final boolean overwrite,
+        final int bufferSize, final short replication, final long blockSize,
+        final Progressable progress) throws AccessControlException {
+      throw readOnlyMountTable("create", f);
+    }
+
+    @Override
+    public boolean delete(final Path f, final boolean recursive)
+        throws AccessControlException, IOException {
+      checkPathIsSlash(f);
+      throw readOnlyMountTable("delete", f);
+    }
+    
+    @Override
+    @SuppressWarnings("deprecation")
+    public boolean delete(final Path f)
+        throws AccessControlException, IOException {
+      return delete(f, true);
+    }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(final FileStatus fs,
+        final long start, final long len) throws 
+        FileNotFoundException, IOException {
+      checkPathIsSlash(fs.getPath());
+      throw new FileNotFoundException("Path points to dir not a file");
+    }
+
+    @Override
+    public FileChecksum getFileChecksum(final Path f)
+        throws FileNotFoundException, IOException {
+      checkPathIsSlash(f);
+      throw new FileNotFoundException("Path points to dir not a file");
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      checkPathIsSlash(f);
+      return new FileStatus(0, true, 0, 0, creationTime, creationTime,
+          PERMISSION_RRR, ugi.getUserName(), ugi.getGroupNames()[0],
+
+          new Path(theInternalDir.fullPath).makeQualified(
+              myUri, null));
+    }
+    
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws AccessControlException,
+        FileNotFoundException, IOException {
+      checkPathIsSlash(f);
+      FileStatus[] result = new FileStatus[theInternalDir.children.size()];
+      int i = 0;
+      for (Entry<String, INode<FileSystem>> iEntry : 
+                                          theInternalDir.children.entrySet()) {
+        INode<FileSystem> inode = iEntry.getValue();
+        if (inode instanceof INodeLink ) {
+          INodeLink<FileSystem> link = (INodeLink<FileSystem>) inode;
+
+          result[i++] = new FileStatus(0, false, 0, 0,
+            creationTime, creationTime, PERMISSION_RRR,
+            ugi.getUserName(), ugi.getGroupNames()[0],
+            link.getTargetLink(),
+            new Path(inode.fullPath).makeQualified(
+                myUri, null));
+        } else {
+          result[i++] = new FileStatus(0, true, 0, 0,
+            creationTime, creationTime, PERMISSION_RRR,
+            ugi.getUserName(), ugi.getGroupNames()[0],
+            new Path(inode.fullPath).makeQualified(
+                myUri, null));
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public boolean mkdirs(Path dir, FsPermission permission)
+        throws AccessControlException, FileAlreadyExistsException {
+      if (theInternalDir.isRoot & dir == null) {
+        throw new FileAlreadyExistsException("/ already exits");
+      }
+      // Note dir starts with /
+      if (theInternalDir.children.containsKey(dir.toString().substring(1))) {
+        return true; // this is the stupid semantics of FileSystem
+      }
+      throw readOnlyMountTable("mkdirs",  dir);
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize)
+        throws AccessControlException, FileNotFoundException, IOException {
+      checkPathIsSlash(f);
+      throw new FileNotFoundException("Path points to dir not a file");
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws AccessControlException,
+        IOException {
+      checkPathIsSlash(src);
+      checkPathIsSlash(dst);
+      throw readOnlyMountTable("rename", src);     
+    }
+
+    @Override
+    public void setOwner(Path f, String username, String groupname)
+        throws AccessControlException, IOException {
+      checkPathIsSlash(f);
+      throw readOnlyMountTable("setOwner", f);
+    }
+
+    @Override
+    public void setPermission(Path f, FsPermission permission)
+        throws AccessControlException, IOException {
+      checkPathIsSlash(f);
+      throw readOnlyMountTable("setPermission", f);    
+    }
+
+    @Override
+    public boolean setReplication(Path f, short replication)
+        throws AccessControlException, IOException {
+      checkPathIsSlash(f);
+      throw readOnlyMountTable("setReplication", f);
+    }
+
+    @Override
+    public void setTimes(Path f, long mtime, long atime)
+        throws AccessControlException, IOException {
+      checkPathIsSlash(f);
+      throw readOnlyMountTable("setTimes", f);    
+    }
+
+    @Override
+    public void setVerifyChecksum(boolean verifyChecksum) {
+      // Noop for viewfs
+    }
+  }
+}