You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:51:50 UTC

svn commit: r1077206 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/we...

Author: omalley
Date: Fri Mar  4 03:51:49 2011
New Revision: 1077206

URL: http://svn.apache.org/viewvc?rev=1077206&view=rev
Log:
commit 86c3885268798eebf5271cd1a0ec735a734d68ef
Author: Hairong Kuang <ha...@ucdev21.inktomisearch.com>
Date:   Tue Feb 23 20:28:10 2010 +0000

    HDFS:946 from http://issues.apache.org/jira/secure/attachment/12436753/HdfsFileStatus-yahoo20.patch.
    
    +++ b/YAHOO-CHANGES.txt
    +    HDFS-946. NameNode should not return full path name when lisitng a
    +    diretory or getting the status of a file. (hairong)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/HdfsFileStatus-yahoo20.patch
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestFileStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/browseDirectory.jsp

Added: hadoop/common/branches/branch-0.20-security-patches/HdfsFileStatus-yahoo20.patch
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/HdfsFileStatus-yahoo20.patch?rev=1077206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/HdfsFileStatus-yahoo20.patch (added)
+++ hadoop/common/branches/branch-0.20-security-patches/HdfsFileStatus-yahoo20.patch Fri Mar  4 03:51:49 2011
@@ -0,0 +1,1257 @@
+diff --git src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
+index ec6f700..6a1eec1 100644
+--- src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
++++ src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
+@@ -530,7 +530,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
+   OutputStream append(String src, int buffersize, Progressable progress
+       ) throws IOException {
+     checkOpen();
+-    FileStatus stat = null;
++    HdfsFileStatus stat = null;
+     LocatedBlock lastBlock = null;
+     try {
+       stat = getFileInfo(src);
+@@ -613,10 +613,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
+     return getFileInfo(src) != null;
+   }
+ 
+-  /** @deprecated Use getFileStatus() instead */
++  /** @deprecated Use getHdfsFileStatus() instead */
+   @Deprecated
+   public boolean isDirectory(String src) throws IOException {
+-    FileStatus fs = getFileInfo(src);
++    HdfsFileStatus fs = getFileInfo(src);
+     if (fs != null)
+       return fs.isDir();
+     else
+@@ -625,7 +625,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
+ 
+   /**
+    */
+-  public FileStatus[] listPaths(String src) throws IOException {
++  public HdfsFileStatus[] listPaths(String src) throws IOException {
+     checkOpen();
+     try {
+       return namenode.getListing(src);
+@@ -634,7 +634,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
+     }
+   }
+ 
+-  public FileStatus getFileInfo(String src) throws IOException {
++  public HdfsFileStatus getFileInfo(String src) throws IOException {
+     checkOpen();
+     try {
+       return namenode.getFileInfo(src);
+@@ -2818,7 +2818,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
+      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+      */
+     DFSOutputStream(String src, int buffersize, Progressable progress,
+-        LocatedBlock lastBlock, FileStatus stat,
++        LocatedBlock lastBlock, HdfsFileStatus stat,
+         int bytesPerChecksum) throws IOException {
+       this(src, stat.getBlockSize(), progress, bytesPerChecksum);
+       initialFileSize = stat.getLen(); // length of file when opened
+diff --git src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
+index 4f29b83..b351814 100644
+--- src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
++++ src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
+@@ -19,6 +19,8 @@
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.IOException;
++import java.io.UnsupportedEncodingException;
++
+ import org.apache.hadoop.conf.Configuration;
+ import java.util.StringTokenizer;
+ import org.apache.hadoop.fs.Path;
+@@ -65,5 +67,29 @@ public class DFSUtil {
+     String user = conf.get(userNameKey, System.getProperty("user.name"));
+     UserGroupInformation.loginUserFromKeytab(user, keytabFilename);
+   }
++  
++  /**
++   * Converts a byte array to a string using UTF8 encoding.
++   */
++  public static String bytes2String(byte[] bytes) {
++    try {
++      return new String(bytes, "UTF8");
++    } catch(UnsupportedEncodingException e) {
++      assert false : "UTF8 encoding is not supported ";
++    }
++    return null;
++  }
++
++  /**
++   * Converts a string to a byte array using UTF8 encoding.
++   */
++  public static byte[] string2Bytes(String str) {
++    try {
++      return str.getBytes("UTF8");
++    } catch(UnsupportedEncodingException e) {
++      assert false : "UTF8 encoding is not supported ";
++    }
++    return null;
++  }
+ }
+ 
+diff --git src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
+index 31d289e..fa15e17 100644
+--- src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
++++ src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
+@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.FSConstants;
+ import org.apache.hadoop.hdfs.protocol.Block;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+ import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+@@ -244,20 +245,20 @@ public class DistributedFileSystem extends FileSystem {
+     dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
+   }
+   
+-  private FileStatus makeQualified(FileStatus f) {
++  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
+     return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
+         f.getBlockSize(), f.getModificationTime(),
+         f.getAccessTime(),
+         f.getPermission(), f.getOwner(), f.getGroup(),
+-        f.getPath().makeQualified(this)); // fully-qualify path
++        f.getFullPath(parent).makeQualified(this)); // fully-qualify path
+   }
+ 
+   public FileStatus[] listStatus(Path p) throws IOException {
+-    FileStatus[] infos = dfs.listPaths(getPathName(p));
++    HdfsFileStatus[] infos = dfs.listPaths(getPathName(p));
+     if (infos == null) return null;
+     FileStatus[] stats = new FileStatus[infos.length];
+     for (int i = 0; i < infos.length; i++) {
+-      stats[i] = makeQualified(infos[i]);
++      stats[i] = makeQualified(infos[i], p);
+     }
+     return stats;
+   }
+@@ -454,9 +455,9 @@ public class DistributedFileSystem extends FileSystem {
+    * @throws FileNotFoundException if the file does not exist.
+    */
+   public FileStatus getFileStatus(Path f) throws IOException {
+-    FileStatus fi = dfs.getFileInfo(getPathName(f));
++    HdfsFileStatus fi = dfs.getFileInfo(getPathName(f));
+     if (fi != null) {
+-      return makeQualified(fi);
++      return makeQualified(fi, f);
+     } else {
+       throw new FileNotFoundException("File does not exist: " + f);
+     }
+diff --git src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+index dabec11..1f1d2bb 100644
+--- src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
++++ src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+@@ -22,11 +22,11 @@ import java.io.*;
+ import org.apache.hadoop.ipc.VersionedProtocol;
+ import org.apache.hadoop.security.AccessControlException;
+ import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+ import org.apache.hadoop.fs.permission.*;
+ import org.apache.hadoop.hdfs.DFSConfigKeys;
+ import org.apache.hadoop.fs.ContentSummary;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.security.KerberosInfo;
+ import org.apache.hadoop.security.token.Token;
+@@ -49,9 +49,10 @@ public interface ClientProtocol extends VersionedProtocol {
+    * Compared to the previous version the following changes have been introduced:
+    * (Only the latest change is reflected.
+    * The log of historical changes can be retrieved from the svn).
+-   * 43: Adding Delegation Token related APIs
++   * 44: getFileInfo returns HDFSFileStatus;
++   *     getListing returns HDFSFileStatus[].
+    */
+-  public static final long versionID = 43L;
++  public static final long versionID = 44L;
+   
+   ///////////////////////////////////////
+   // File contents
+@@ -266,7 +267,7 @@ public interface ClientProtocol extends VersionedProtocol {
+   /**
+    * Get a listing of the indicated directory
+    */
+-  public FileStatus[] getListing(String src) throws IOException;
++  public HdfsFileStatus[] getListing(String src) throws IOException;
+ 
+   ///////////////////////////////////////
+   // System issues and management
+@@ -434,7 +435,7 @@ public interface ClientProtocol extends VersionedProtocol {
+    * @return object containing information regarding the file
+    *         or null if file not found
+    */
+-  public FileStatus getFileInfo(String src) throws IOException;
++  public HdfsFileStatus getFileInfo(String src) throws IOException;
+ 
+   /**
+    * Get {@link ContentSummary} rooted at the specified directory.
+diff --git src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
+new file mode 100644
+index 0000000..fb84bc9
+--- /dev/null
++++ src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
+@@ -0,0 +1,236 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.protocol;
++
++import java.io.DataInput;
++import java.io.DataOutput;
++import java.io.IOException;
++
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.hdfs.DFSUtil;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.io.Writable;
++
++/** Interface that represents the over the wire information for a file.
++ */
++public class HdfsFileStatus implements Writable {
++
++  private byte[] path;  // local name of the inode that's encoded in java UTF8
++  private long length;
++  private boolean isdir;
++  private short block_replication;
++  private long blocksize;
++  private long modification_time;
++  private long access_time;
++  private FsPermission permission;
++  private String owner;
++  private String group;
++  
++  public static final byte[] EMPTY_NAME = new byte[0];
++
++  /**
++   * default constructor
++   */
++  public HdfsFileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }
++  
++  /**
++   * Constructor
++   * @param length the number of bytes the file has
++   * @param isdir if the path is a directory
++   * @param block_replication the replication factor
++   * @param blocksize the block size
++   * @param modification_time modification time
++   * @param access_time access time
++   * @param permission permission
++   * @param owner the owner of the path
++   * @param group the group of the path
++   * @param path the local name in java UTF8 encoding the same as that in-memory
++   */
++  public HdfsFileStatus(long length, boolean isdir, int block_replication,
++                    long blocksize, long modification_time, long access_time,
++                    FsPermission permission, String owner, String group, 
++                    byte[] path) {
++    this.length = length;
++    this.isdir = isdir;
++    this.block_replication = (short)block_replication;
++    this.blocksize = blocksize;
++    this.modification_time = modification_time;
++    this.access_time = access_time;
++    this.permission = (permission == null) ? 
++                      FsPermission.getDefault() : permission;
++    this.owner = (owner == null) ? "" : owner;
++    this.group = (group == null) ? "" : group;
++    this.path = path;
++  }
++
++  /**
++   * Get the length of this file, in bytes.
++   * @return the length of this file, in bytes.
++   */
++  final public long getLen() {
++    return length;
++  }
++
++  /**
++   * Is this a directory?
++   * @return true if this is a directory
++   */
++  final public boolean isDir() {
++    return isdir;
++  }
++
++  /**
++   * Get the block size of the file.
++   * @return the number of bytes
++   */
++  final public long getBlockSize() {
++    return blocksize;
++  }
++
++  /**
++   * Get the replication factor of a file.
++   * @return the replication factor of a file.
++   */
++  final public short getReplication() {
++    return block_replication;
++  }
++
++  /**
++   * Get the modification time of the file.
++   * @return the modification time of file in milliseconds since January 1, 1970 UTC.
++   */
++  final public long getModificationTime() {
++    return modification_time;
++  }
++
++  /**
++   * Get the access time of the file.
++   * @return the access time of file in milliseconds since January 1, 1970 UTC.
++   */
++  final public long getAccessTime() {
++    return access_time;
++  }
++
++  /**
++   * Get FsPermission associated with the file.
++   * @return permssion
++   */
++  final public FsPermission getPermission() {
++    return permission;
++  }
++  
++  /**
++   * Get the owner of the file.
++   * @return owner of the file
++   */
++  final public String getOwner() {
++    return owner;
++  }
++  
++  /**
++   * Get the group associated with the file.
++   * @return group for the file. 
++   */
++  final public String getGroup() {
++    return group;
++  }
++  
++  /**
++   * Check if the local name is empty
++   * @return true if the name is empty
++   */
++  final public boolean isEmptyLocalName() {
++    return path.length == 0;
++  }
++
++  /**
++   * Get the string representation of the local name
++   * @return the local name in string
++   */
++  final public String getLocalName() {
++    return DFSUtil.bytes2String(path);
++  }
++  
++  /**
++   * Get the string representation of the full path name
++   * @param parent the parent path
++   * @return the full path in string
++   */
++  final public String getFullName(final String parent) {
++    if (isEmptyLocalName()) {
++      return parent;
++    }
++    
++    StringBuilder fullName = new StringBuilder(parent);
++    if (!parent.endsWith(Path.SEPARATOR)) {
++      fullName.append(Path.SEPARATOR);
++    }
++    fullName.append(getLocalName());
++    return fullName.toString();
++  }
++
++  /**
++   * Get the full path
++   * @param parent the parent path
++   * @return the full path
++   */
++  final public Path getFullPath(final Path parent) {
++    if (isEmptyLocalName()) {
++      return parent;
++    }
++    
++    return new Path(parent, getLocalName());
++  }
++
++  //////////////////////////////////////////////////
++  // Writable
++  //////////////////////////////////////////////////
++  public void write(DataOutput out) throws IOException {
++    out.writeInt(path.length);
++    out.write(path);
++    out.writeLong(length);
++    out.writeBoolean(isdir);
++    out.writeShort(block_replication);
++    out.writeLong(blocksize);
++    out.writeLong(modification_time);
++    out.writeLong(access_time);
++    permission.write(out);
++    Text.writeString(out, owner);
++    Text.writeString(out, group);
++  }
++
++  public void readFields(DataInput in) throws IOException {
++    int numOfBytes = in.readInt();
++    if (numOfBytes == 0) {
++      this.path = EMPTY_NAME;
++    } else {
++      this.path = new byte[numOfBytes];
++      in.readFully(path);
++    }
++    this.length = in.readLong();
++    this.isdir = in.readBoolean();
++    this.block_replication = in.readShort();
++    blocksize = in.readLong();
++    modification_time = in.readLong();
++    access_time = in.readLong();
++    permission.readFields(in);
++    owner = Text.readString(in);
++    group = Text.readString(in);
++  }
++}
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+index 7130b4c..71f8d9d 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+@@ -21,7 +21,6 @@ import java.io.*;
+ import java.util.*;
+ 
+ import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.ContentSummary;
+ import org.apache.hadoop.fs.permission.*;
+@@ -30,6 +29,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
+ import org.apache.hadoop.metrics.MetricsContext;
+ import org.apache.hadoop.hdfs.protocol.FSConstants;
+ import org.apache.hadoop.hdfs.protocol.Block;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+ import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+ import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
+@@ -706,7 +706,7 @@ class FSDirectory implements FSConstants, Closeable {
+    * This function is admittedly very inefficient right now.  We'll
+    * make it better later.
+    */
+-  FileStatus[] getListing(String src) {
++  HdfsFileStatus[] getListing(String src) {
+     String srcs = normalizePath(src);
+ 
+     synchronized (rootDir) {
+@@ -714,15 +714,14 @@ class FSDirectory implements FSConstants, Closeable {
+       if (targetNode == null)
+         return null;
+       if (!targetNode.isDirectory()) {
+-        return new FileStatus[]{createFileStatus(srcs, targetNode)};
++        return new HdfsFileStatus[]{createFileStatus(
++            HdfsFileStatus.EMPTY_NAME, targetNode)};
+       }
+       List<INode> contents = ((INodeDirectory)targetNode).getChildren();
+-      FileStatus listing[] = new FileStatus[contents.size()];
+-      if(! srcs.endsWith(Path.SEPARATOR))
+-        srcs += Path.SEPARATOR;
++      HdfsFileStatus listing[] = new HdfsFileStatus[contents.size()];
+       int i = 0;
+       for (INode cur : contents) {
+-        listing[i] = createFileStatus(srcs+cur.getLocalName(), cur);
++        listing[i] = createFileStatus(cur.name, cur);
+         i++;
+       }
+       return listing;
+@@ -734,7 +733,7 @@ class FSDirectory implements FSConstants, Closeable {
+    * @return object containing information regarding the file
+    *         or null if file not found
+    */
+-  FileStatus getFileInfo(String src) {
++  HdfsFileStatus getFileInfo(String src) {
+     String srcs = normalizePath(src);
+     synchronized (rootDir) {
+       INode targetNode = rootDir.getNode(srcs);
+@@ -742,7 +741,7 @@ class FSDirectory implements FSConstants, Closeable {
+         return null;
+       }
+       else {
+-        return createFileStatus(srcs, targetNode);
++        return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
+       }
+     }
+   }
+@@ -1351,9 +1350,10 @@ class FSDirectory implements FSConstants, Closeable {
+   /**
+    * Create FileStatus by file INode 
+    */
+-   private static FileStatus createFileStatus(String path, INode node) {
++   private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
+     // length is zero for directories
+-    return new FileStatus(node.isDirectory() ? 0 : node.computeContentSummary().getLength(), 
++    return new HdfsFileStatus(
++        node.isDirectory() ? 0 : ((INodeFile)node).computeContentSummary().getLength(), 
+         node.isDirectory(), 
+         node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
+         node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
+@@ -1362,6 +1362,6 @@ class FSDirectory implements FSConstants, Closeable {
+         node.getFsPermission(),
+         node.getUserName(),
+         node.getGroupName(),
+-        new Path(path));
++        path);
+   }
+ }
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+index fe04470..8d6b902 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+@@ -36,12 +36,12 @@ import java.nio.ByteBuffer;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.FSConstants;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.server.common.Storage;
+ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+ import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+ import org.apache.hadoop.io.*;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.permission.*;
+ 
+ /**
+@@ -654,7 +654,7 @@ public class FSEditLog {
+           String s = FSImage.readString(in);
+           String d = FSImage.readString(in);
+           timestamp = readLong(in);
+-          FileStatus dinfo = fsDir.getFileInfo(d);
++          HdfsFileStatus dinfo = fsDir.getFileInfo(d);
+           fsDir.unprotectedRenameTo(s, d, timestamp);
+           fsNamesys.changeLease(s, d, dinfo);
+           break;
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+index 35e406d..f3fbe22 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.PermissionStatus;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hdfs.DFSUtil;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.FSConstants;
+@@ -133,7 +134,7 @@ public class FSImage extends Storage {
+    * Used for saving the image to disk
+    */
+   static private final FsPermission FILE_PERM = new FsPermission((short)0);
+-  static private final byte[] PATH_SEPARATOR = INode.string2Bytes(Path.SEPARATOR);
++  static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
+ 
+   /**
+    */
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+index c9f4709..5cbf356 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+ import org.apache.hadoop.fs.ContentSummary;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.*;
+ import org.apache.hadoop.ipc.Server;
+@@ -109,7 +108,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+ 
+   private static final void logAuditEvent(UserGroupInformation ugi,
+       InetAddress addr, String cmd, String src, String dst,
+-      FileStatus stat) {
++      HdfsFileStatus stat) {
+     final Formatter fmt = auditFormatter.get();
+     ((StringBuilder)fmt.out()).setLength(0);
+     auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
+@@ -751,7 +750,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+     dir.setPermission(src, permission);
+     getEditLog().logSync();
+     if (auditLog.isInfoEnabled()) {
+-      final FileStatus stat = dir.getFileInfo(src);
++      final HdfsFileStatus stat = dir.getFileInfo(src);
+       logAuditEvent(UserGroupInformation.getCurrentUser(),
+                     Server.getRemoteIp(),
+                     "setPermission", src, null, stat);
+@@ -777,7 +776,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+     dir.setOwner(src, username, group);
+     getEditLog().logSync();
+     if (auditLog.isInfoEnabled()) {
+-      final FileStatus stat = dir.getFileInfo(src);
++      final HdfsFileStatus stat = dir.getFileInfo(src);
+       logAuditEvent(UserGroupInformation.getCurrentUser(),
+                     Server.getRemoteIp(),
+                     "setOwner", src, null, stat);
+@@ -937,7 +936,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+     if (inode != null) {
+       dir.setTimes(src, inode, mtime, atime, true);
+       if (auditLog.isInfoEnabled()) {
+-        final FileStatus stat = dir.getFileInfo(src);
++        final HdfsFileStatus stat = dir.getFileInfo(src);
+         logAuditEvent(UserGroupInformation.getCurrentUser(),
+                       Server.getRemoteIp(),
+                       "setTimes", src, null, stat);
+@@ -1052,7 +1051,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+                       replication, blockSize);
+     getEditLog().logSync();
+     if (auditLog.isInfoEnabled()) {
+-      final FileStatus stat = dir.getFileInfo(src);
++      final HdfsFileStatus stat = dir.getFileInfo(src);
+       logAuditEvent(UserGroupInformation.getCurrentUser(),
+                     Server.getRemoteIp(),
+                     "create", src, null, stat);
+@@ -1714,7 +1713,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+     boolean status = renameToInternal(src, dst);
+     getEditLog().logSync();
+     if (status && auditLog.isInfoEnabled()) {
+-      final FileStatus stat = dir.getFileInfo(dst);
++      final HdfsFileStatus stat = dir.getFileInfo(dst);
+       logAuditEvent(UserGroupInformation.getCurrentUser(),
+                     Server.getRemoteIp(),
+                     "rename", src, dst, stat);
+@@ -1740,7 +1739,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+       checkAncestorAccess(actualdst, FsAction.WRITE);
+     }
+ 
+-    FileStatus dinfo = dir.getFileInfo(dst);
++    HdfsFileStatus dinfo = dir.getFileInfo(dst);
+     if (dir.renameTo(src, dst)) {
+       changeLease(src, dst, dinfo);     // update lease with new filename
+       return true;
+@@ -1799,7 +1798,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+    * @return object containing information regarding the file
+    *         or null if file not found
+    */
+-  FileStatus getFileInfo(String src) throws IOException {
++  HdfsFileStatus getFileInfo(String src) throws IOException {
+     if (isPermissionEnabled) {
+       checkTraverse(src);
+     }
+@@ -1814,7 +1813,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+     boolean status = mkdirsInternal(src, permissions);
+     getEditLog().logSync();
+     if (status && auditLog.isInfoEnabled()) {
+-      final FileStatus stat = dir.getFileInfo(src);
++      final HdfsFileStatus stat = dir.getFileInfo(src);
+       logAuditEvent(UserGroupInformation.getCurrentUser(),
+                     Server.getRemoteIp(),
+                     "mkdirs", src, null, stat);
+@@ -2060,7 +2059,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+    * Get a listing of all files at 'src'.  The Object[] array
+    * exists so we can return file attributes (soon to be implemented)
+    */
+-  public FileStatus[] getListing(String src) throws IOException {
++  public HdfsFileStatus[] getListing(String src) throws IOException {
+     if (isPermissionEnabled) {
+       if (dir.isDir(src)) {
+         checkPathAccess(src, FsAction.READ_EXECUTE);
+@@ -4841,7 +4840,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+   // rename was successful. If any part of the renamed subtree had
+   // files that were being written to, update with new filename.
+   //
+-  void changeLease(String src, String dst, FileStatus dinfo) 
++  void changeLease(String src, String dst, HdfsFileStatus dinfo) 
+                    throws IOException {
+     String overwrite;
+     String replaceBy;
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
+index 93a1d75..061f5b7 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
+@@ -24,10 +24,11 @@ import java.security.PrivilegedExceptionAction;
+ import javax.servlet.http.HttpServletRequest;
+ import javax.servlet.http.HttpServletResponse;
+ 
+-import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+ import org.apache.hadoop.security.UserGroupInformation;
+ 
+@@ -37,11 +38,11 @@ import org.apache.hadoop.security.UserGroupInformation;
+ public class FileDataServlet extends DfsServlet {
+ 
+   /** Create a redirection URI */
+-  protected URI createUri(FileStatus i, UserGroupInformation ugi,
++  protected URI createUri(String parent, HdfsFileStatus i, UserGroupInformation ugi,
+       ClientProtocol nnproxy, HttpServletRequest request)
+       throws IOException, URISyntaxException {
+     String scheme = request.getScheme();
+-    final DatanodeID host = pickSrcDatanode(i, nnproxy);
++    final DatanodeID host = pickSrcDatanode(parent, i, nnproxy);
+     final String hostname;
+     if (host instanceof DatanodeInfo) {
+       hostname = ((DatanodeInfo)host).getHostName();
+@@ -52,7 +53,7 @@ public class FileDataServlet extends DfsServlet {
+         "https".equals(scheme)
+           ? (Integer)getServletContext().getAttribute("datanode.https.port")
+           : host.getInfoPort(),
+-        "/streamFile", "filename=" + i.getPath() + 
++        "/streamFile", "filename=" + i.getFullName(parent) + 
+         "&ugi=" + ugi.getShortUserName(), null);
+   }
+ 
+@@ -62,7 +63,7 @@ public class FileDataServlet extends DfsServlet {
+    * Currently, this looks at no more than the first five blocks of a file,
+    * selecting a datanode randomly from the most represented.
+    */
+-  private static DatanodeID pickSrcDatanode(FileStatus i,
++  private static DatanodeID pickSrcDatanode(String parent, HdfsFileStatus i,
+       ClientProtocol nnproxy) throws IOException {
+     // a race condition can happen by initializing a static member this way.
+     // A proper fix should make JspHelper a singleton. Since it doesn't affect 
+@@ -70,7 +71,7 @@ public class FileDataServlet extends DfsServlet {
+     if (jspHelper == null)
+       jspHelper = new JspHelper();
+     final LocatedBlocks blks = nnproxy.getBlockLocations(
+-        i.getPath().toUri().getPath(), 0, 1);
++        i.getFullPath(new Path(parent)).toUri().getPath(), 0, 1);
+     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
+       // pick a random datanode
+       return jspHelper.randomNode();
+@@ -101,9 +102,9 @@ public class FileDataServlet extends DfsServlet {
+       final String path = request.getPathInfo() != null ? 
+                                                     request.getPathInfo() : "/";
+       
+-      FileStatus info = nnproxy.getFileInfo(path);
++      HdfsFileStatus info = nnproxy.getFileInfo(path);
+       if ((info != null) && !info.isDir()) {
+-        response.sendRedirect(createUri(info, ugi, nnproxy,
++        response.sendRedirect(createUri(path, info, ugi, nnproxy,
+               request).toURL().toString());
+       } else if (info == null){
+         response.sendError(400, "cat: File not found " + path);
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
+index cba34fa..02c9dbc 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
+@@ -17,13 +17,13 @@
+  */
+ package org.apache.hadoop.hdfs.server.namenode;
+ 
+-import java.io.UnsupportedEncodingException;
+ import java.util.Arrays;
+ import java.util.List;
+ 
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.ContentSummary;
+ import org.apache.hadoop.fs.permission.*;
++import org.apache.hadoop.hdfs.DFSUtil;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+@@ -221,7 +221,7 @@ abstract class INode implements Comparable<byte[]> {
+    * @return local file name
+    */
+   String getLocalName() {
+-    return bytes2String(name);
++    return DFSUtil.bytes2String(name);
+   }
+ 
+   /**
+@@ -236,7 +236,7 @@ abstract class INode implements Comparable<byte[]> {
+    * Set local file name
+    */
+   void setLocalName(String name) {
+-    this.name = string2Bytes(name);
++    this.name = DFSUtil.string2Bytes(name);
+   }
+ 
+   /**
+@@ -324,7 +324,7 @@ abstract class INode implements Comparable<byte[]> {
+     }
+     byte[][] bytes = new byte[strings.length][];
+     for (int i = 0; i < strings.length; i++)
+-      bytes[i] = string2Bytes(strings[i]);
++      bytes[i] = DFSUtil.string2Bytes(strings[i]);
+     return bytes;
+   }
+ 
+@@ -393,30 +393,6 @@ abstract class INode implements Comparable<byte[]> {
+     }
+     return len1 - len2;
+   }
+-
+-  /**
+-   * Converts a byte array to a string using UTF8 encoding.
+-   */
+-  static String bytes2String(byte[] bytes) {
+-    try {
+-      return new String(bytes, "UTF8");
+-    } catch(UnsupportedEncodingException e) {
+-      assert false : "UTF8 encoding is not supported ";
+-    }
+-    return null;
+-  }
+-
+-  /**
+-   * Converts a string to a byte array using UTF8 encoding.
+-   */
+-  static byte[] string2Bytes(String str) {
+-    try {
+-      return str.getBytes("UTF8");
+-    } catch(UnsupportedEncodingException e) {
+-      assert false : "UTF8 encoding is not supported ";
+-    }
+-    return null;
+-  }
+   
+   
+   LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+index 6c6a570..16ec2a1 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+@@ -25,6 +25,7 @@ import java.util.List;
+ import org.apache.hadoop.fs.permission.FsAction;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.fs.permission.PermissionStatus;
++import org.apache.hadoop.hdfs.DFSUtil;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ 
+ /**
+@@ -95,7 +96,7 @@ class INodeDirectory extends INode {
+   }
+   
+   INode getChild(String name) {
+-    return getChildINode(string2Bytes(name));
++    return getChildINode(DFSUtil.string2Bytes(name));
+   }
+ 
+   private INode getChildINode(byte[] name) {
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
+index d9077c5..c1157be 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
+@@ -17,9 +17,10 @@
+  */
+ package org.apache.hadoop.hdfs.server.namenode;
+ 
+-import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hdfs.HftpFileSystem;
+ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.ipc.RemoteException;
+ import org.apache.hadoop.util.VersionInfo;
+ 
+@@ -59,10 +60,10 @@ public class ListPathsServlet extends DfsServlet {
+    * Node information includes path, modification, permission, owner and group.
+    * For files, it also includes size, replication and block-size. 
+    */
+-  static void writeInfo(FileStatus i, XMLOutputter doc) throws IOException {
++  static void writeInfo(String parent, HdfsFileStatus i, XMLOutputter doc) throws IOException {
+     final SimpleDateFormat ldf = df.get();
+     doc.startTag(i.isDir() ? "directory" : "file");
+-    doc.attribute("path", i.getPath().toUri().getPath());
++    doc.attribute("path", i.getFullPath(new Path(parent)).toUri().getPath());
+     doc.attribute("modified", ldf.format(new Date(i.getModificationTime())));
+     doc.attribute("accesstime", ldf.format(new Date(i.getAccessTime())));
+     if (!i.isDir()) {
+@@ -148,9 +149,9 @@ public class ListPathsServlet extends DfsServlet {
+         doc.attribute(m.getKey(), m.getValue());
+       }
+ 
+-      FileStatus base = nnproxy.getFileInfo(path);
++      HdfsFileStatus base = nnproxy.getFileInfo(path);
+       if ((base != null) && base.isDir()) {
+-        writeInfo(base, doc);
++        writeInfo(path, base, doc);
+       }
+ 
+       Stack<String> pathstack = new Stack<String>();
+@@ -158,20 +159,21 @@ public class ListPathsServlet extends DfsServlet {
+       while (!pathstack.empty()) {
+         String p = pathstack.pop();
+         try {
+-          FileStatus[] listing = nnproxy.getListing(p);
++          HdfsFileStatus[] listing = nnproxy.getListing(p);
+           if (listing == null) {
+             LOG.warn("ListPathsServlet - Path " + p + " does not exist");
+             continue;
+           }
+-          for (FileStatus i : listing) {
+-            if (exclude.matcher(i.getPath().getName()).matches()
+-                || !filter.matcher(i.getPath().getName()).matches()) {
++          for (HdfsFileStatus i : listing) {
++            String localName = i.getLocalName();
++            if (exclude.matcher(localName).matches()
++                || !filter.matcher(localName).matches()) {
+               continue;
+             }
+             if (recur && i.isDir()) {
+-              pathstack.push(i.getPath().toUri().getPath());
++              pathstack.push(new Path(p, localName).toUri().getPath());
+             }
+-            writeInfo(i, doc);
++            writeInfo(p, i, doc);
+           }
+         }
+         catch(RemoteException re) {re.writeXml(p, doc);}
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+index f92954e..961a5f6 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
+ import org.apache.commons.logging.*;
+ 
+ import org.apache.hadoop.fs.ContentSummary;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.Trash;
+ import org.apache.hadoop.fs.FileSystem;
+@@ -584,8 +583,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
+ 
+   /**
+    */
+-  public FileStatus[] getListing(String src) throws IOException {
+-    FileStatus[] files = namesystem.getListing(src);
++  public HdfsFileStatus[] getListing(String src) throws IOException {
++    HdfsFileStatus[] files = namesystem.getListing(src);
+     if (files != null) {
+       myMetrics.numGetListingOps.inc();
+     }
+@@ -599,7 +598,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
+    * @return object containing information regarding the file
+    *         or null if file not found
+    */
+-  public FileStatus getFileInfo(String src)  throws IOException {
++  public HdfsFileStatus getFileInfo(String src)  throws IOException {
+     myMetrics.numFileInfoOps.inc();
+     return namesystem.getFileInfo(src);
+   }
+diff --git src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+index 462e86d..18f3088 100644
+--- src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
++++ src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+@@ -32,11 +32,11 @@ import java.util.TreeSet;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.hdfs.DFSClient;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+ import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+@@ -140,10 +140,10 @@ public class NamenodeFsck {
+     try {
+       Result res = new Result(conf);
+ 
+-      final FileStatus[] files = namenode.getListing(path);
++      final HdfsFileStatus[] files = namenode.getListing(path);
+       if (files != null) {
+         for (int i = 0; i < files.length; i++) {
+-          check(files[i], res);
++          check(path, files[i], res);
+         }
+         out.println(res);
+         out.println(" Number of data-nodes:\t\t" + totalDatanodes);
+@@ -170,12 +170,12 @@ public class NamenodeFsck {
+     }
+   }
+   
+-  private void check(FileStatus file, Result res) throws IOException {
+-    String path = file.getPath().toString();
++  private void check(String parent, HdfsFileStatus file, Result res) throws IOException {
++    String path = file.getFullName(parent);
+     boolean isOpen = false;
+ 
+     if (file.isDir()) {
+-      final FileStatus[] files = namenode.getListing(path);
++      final HdfsFileStatus[] files = namenode.getListing(path);
+       if (files == null) {
+         return;
+       }
+@@ -184,7 +184,7 @@ public class NamenodeFsck {
+       }
+       res.totalDirs++;
+       for (int i = 0; i < files.length; i++) {
+-        check(files[i], res);
++        check(path, files[i], res);
+       }
+       return;
+     }
+@@ -303,7 +303,7 @@ public class NamenodeFsck {
+         break;
+       case FIXING_MOVE:
+         if (!isOpen)
+-          lostFoundMove(file, blocks);
++          lostFoundMove(parent, file, blocks);
+         break;
+       case FIXING_DELETE:
+         if (!isOpen)
+@@ -322,7 +322,7 @@ public class NamenodeFsck {
+     }
+   }
+   
+-  private void lostFoundMove(FileStatus file, LocatedBlocks blocks)
++  private void lostFoundMove(String parent, HdfsFileStatus file, LocatedBlocks blocks)
+     throws IOException {
+     final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
+     try {
+@@ -332,8 +332,9 @@ public class NamenodeFsck {
+     if (!lfInitedOk) {
+       return;
+     }
+-    String target = lostFound + file.getPath();
+-    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
++    String fullName = file.getFullName(parent);
++    String target = lostFound + fullName;
++    String errmsg = "Failed to move " + fullName + " to /lost+found";
+     try {
+       if (!namenode.mkdirs(target, file.getPermission())) {
+         LOG.warn(errmsg);
+@@ -377,8 +378,8 @@ public class NamenodeFsck {
+         }
+       }
+       if (fos != null) fos.close();
+-      LOG.warn("\n - moved corrupted file " + file.getPath() + " to /lost+found");
+-      dfs.delete(file.getPath().toString(), true);
++      LOG.warn("\n - moved corrupted file " + fullName + " to /lost+found");
++      dfs.delete(fullName, true);
+     }  catch (Exception e) {
+       e.printStackTrace();
+       LOG.warn(errmsg + ": " + e.getMessage());
+diff --git src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+index 3e449af..d661497 100644
+--- src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
++++ src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+@@ -189,7 +189,7 @@ public class TestDFSClientRetries extends TestCase {
+ 
+     public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+ 
+-    public FileStatus[] getListing(String src) throws IOException { return null; }
++    public HdfsFileStatus[] getListing(String src) throws IOException { return null; }
+ 
+     public void renewLease(String clientName) throws IOException {}
+ 
+@@ -213,7 +213,7 @@ public class TestDFSClientRetries extends TestCase {
+ 
+     public void metaSave(String filename) throws IOException {}
+ 
+-    public FileStatus getFileInfo(String src) throws IOException { return null; }
++    public HdfsFileStatus getFileInfo(String src) throws IOException { return null; }
+ 
+     public ContentSummary getContentSummary(String path) throws IOException { return null; }
+ 
+diff --git src/test/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java src/test/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
+index bfaf86a..c32d7c3 100644
+--- src/test/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
++++ src/test/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
+@@ -28,9 +28,9 @@ import java.util.zip.CRC32;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSInputStream;
+-import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileUtil;
+ import org.apache.hadoop.hdfs.protocol.FSConstants;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+@@ -130,11 +130,11 @@ public class TestDFSUpgradeFromImage extends TestCase {
+   private void verifyDir(DFSClient client, String dir) 
+                                            throws IOException {
+     
+-    FileStatus[] fileArr = client.listPaths(dir);
++    HdfsFileStatus[] fileArr = client.listPaths(dir);
+     TreeMap<String, Boolean> fileMap = new TreeMap<String, Boolean>();
+     
+-    for(FileStatus file : fileArr) {
+-      String path = file.getPath().toString();
++    for(HdfsFileStatus file : fileArr) {
++      String path = file.getFullName(dir);
+       fileMap.put(path, Boolean.valueOf(file.isDir()));
+     }
+     
+diff --git src/test/org/apache/hadoop/hdfs/TestFileStatus.java src/test/org/apache/hadoop/hdfs/TestFileStatus.java
+index 523b5fd..81be0ce 100644
+--- src/test/org/apache/hadoop/hdfs/TestFileStatus.java
++++ src/test/org/apache/hadoop/hdfs/TestFileStatus.java
+@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.server.namenode.NameNode;
+ 
+ /**
+@@ -81,7 +82,7 @@ public class TestFileStatus extends TestCase {
+                  fs.getFileStatus(path).isDir() == true);
+       
+       // make sure getFileInfo returns null for files which do not exist
+-      FileStatus fileInfo = dfsClient.getFileInfo("/noSuchFile");
++      HdfsFileStatus fileInfo = dfsClient.getFileInfo("/noSuchFile");
+       assertTrue(fileInfo == null);
+ 
+       // create a file in home directory
+@@ -91,21 +92,48 @@ public class TestFileStatus extends TestCase {
+       System.out.println("Created file filestatus.dat with one "
+                          + " replicas.");
+       checkFile(fs, file1, 1);
+-      assertTrue(file1 + " should be a file", 
+-                  fs.getFileStatus(file1).isDir() == false);
+-      assertTrue(fs.getFileStatus(file1).getBlockSize() == blockSize);
+-      assertTrue(fs.getFileStatus(file1).getReplication() == 1);
+-      assertTrue(fs.getFileStatus(file1).getLen() == fileSize);
+       System.out.println("Path : \"" + file1 + "\"");
+ 
++      // test getFileStatus on a file
++      FileStatus status = fs.getFileStatus(file1);
++      assertTrue(file1 + " should be a file", 
++          status.isDir() == false);
++      assertTrue(status.getBlockSize() == blockSize);
++      assertTrue(status.getReplication() == 1);
++      assertTrue(status.getLen() == fileSize);
++      assertEquals(fs.makeQualified(file1), 
++          status.getPath().toString());
++
++      // test listStatus on a file
++      FileStatus[] stats = fs.listStatus(file1);
++      assertEquals(1, stats.length);
++      status = stats[0];
++      assertTrue(file1 + " should be a file", 
++          status.isDir() == false);
++      assertTrue(status.getBlockSize() == blockSize);
++      assertTrue(status.getReplication() == 1);
++      assertTrue(status.getLen() == fileSize);
++      assertEquals(fs.makeQualified(file1).toString(), 
++          status.getPath().toString());
++
+       // create an empty directory
+       //
+       Path parentDir = new Path("/test");
+       Path dir = new Path("/test/mkdirs");
+       assertTrue(fs.mkdirs(dir));
+       assertTrue(fs.exists(dir));
+-      assertTrue(dir + " should be a directory", 
+-                 fs.getFileStatus(path).isDir() == true);
++      System.out.println("Dir : \"" + dir + "\"");
++
++      // test getFileStatus on an empty directory
++      status = fs.getFileStatus(dir);
++      assertTrue(dir + " should be a directory", status.isDir());
++      assertTrue(dir + " should be zero size ", status.getLen() == 0);
++      assertEquals(fs.makeQualified(dir).toString(), 
++          status.getPath().toString());
++
++      // test listStatus on an empty directory
++      stats = fs.listStatus(dir);
++      assertEquals(dir + " should be empty", 0, stats.length);
+       assertTrue(dir + " should be zero size ",
+                  fs.getContentSummary(dir).getLength() == 0);
+       assertTrue(dir + " should be zero size ",
+@@ -114,7 +142,7 @@ public class TestFileStatus extends TestCase {
+ 
+       // create another file that is smaller than a block.
+       //
+-      Path file2 = new Path("/test/mkdirs/filestatus2.dat");
++      Path file2 = new Path(dir, "filestatus2.dat");
+       writeFile(fs, file2, 1, blockSize/4, blockSize);
+       System.out.println("Created file filestatus2.dat with one "
+                          + " replicas.");
+@@ -122,11 +150,14 @@ public class TestFileStatus extends TestCase {
+       System.out.println("Path : \"" + file2 + "\"");
+ 
+       // verify file attributes
+-      assertTrue(fs.getFileStatus(file2).getBlockSize() == blockSize);
+-      assertTrue(fs.getFileStatus(file2).getReplication() == 1);
++      status = fs.getFileStatus(file2);
++      assertTrue(status.getBlockSize() == blockSize);
++      assertTrue(status.getReplication() == 1);
++      assertEquals(fs.makeQualified(file2).toString(), 
++          status.getPath().toString());
+ 
+       // create another file in the same directory
+-      Path file3 = new Path("/test/mkdirs/filestatus3.dat");
++      Path file3 = new Path(dir, "filestatus3.dat");
+       writeFile(fs, file3, 1, blockSize/4, blockSize);
+       System.out.println("Created file filestatus3.dat with one "
+                          + " replicas.");
+@@ -136,7 +167,17 @@ public class TestFileStatus extends TestCase {
+       // of the two files
+       assertTrue(dir + " size should be " + (blockSize/2), 
+                  blockSize/2 == fs.getContentSummary(dir).getLength());
+-    } finally {
++       
++       // test listStatus on a non-empty directory
++       stats = fs.listStatus(dir);
++       assertEquals(dir + " should have two entries", 2, stats.length);
++       String qualifiedFile2 = fs.makeQualified(file2).toString();
++       String qualifiedFile3 = fs.makeQualified(file3).toString();
++       for(FileStatus stat:stats) {
++         String statusFullName = stat.getPath().toString();
++         assertTrue(qualifiedFile2.equals(statusFullName)
++           || qualifiedFile3.toString().equals(statusFullName));
++       }    } finally {
+       fs.close();
+       cluster.shutdown();
+     }
+diff --git src/webapps/datanode/browseDirectory.jsp src/webapps/datanode/browseDirectory.jsp
+index e577dc5..ee1defd 100644
+--- src/webapps/datanode/browseDirectory.jsp
++++ src/webapps/datanode/browseDirectory.jsp
+@@ -76,7 +76,7 @@
+         return;
+       }
+       // directory
+-      FileStatus[] files = dfs.listPaths(target);
++      HdfsFileStatus[] files = dfs.listPaths(target);
+       //generate a table and dump the info
+       String [] headings = { "Name", "Type", "Size", "Replication", 
+                               "Block Size", "Modification Time",
+@@ -104,7 +104,8 @@
+         String cols [] = new String[headings.length];
+         for (int i = 0; i < files.length; i++) {
+           //Get the location of the first block of the file
+-          if (files[i].getPath().toString().endsWith(".crc")) continue;
++          String localname = files[i].getLocalName();
++          if (localname.endsWith(".crc")) continue;
+           if (!files[i].isDir()) {
+             cols[1] = "file";
+             cols[2] = StringUtils.byteDesc(files[i].getLen());
+@@ -118,9 +119,9 @@
+             cols[4] = "";
+           }
+           String datanodeUrl = req.getRequestURL()+"?dir="+
+-              URLEncoder.encode(files[i].getPath().toString(), "UTF-8") + 
++              URLEncoder.encode(files[i].getFullName(target).toString(), "UTF-8") + 
+               "&namenodeInfoPort=" + namenodeInfoPort;
+-          cols[0] = "<a href=\""+datanodeUrl+"\">"+files[i].getPath().getName()+"</a>";
++          cols[0] = "<a href=\""+datanodeUrl+"\">"+localname+"</a>";
+           cols[5] = FsShell.dateForm.format(new Date((files[i].getModificationTime())));
+           cols[6] = files[i].getPermission().toString();
+           cols[7] = files[i].getOwner();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 03:51:49 2011
@@ -530,7 +530,7 @@ public class DFSClient implements FSCons
   OutputStream append(String src, int buffersize, Progressable progress
       ) throws IOException {
     checkOpen();
-    FileStatus stat = null;
+    HdfsFileStatus stat = null;
     LocatedBlock lastBlock = null;
     try {
       stat = getFileInfo(src);
@@ -613,10 +613,10 @@ public class DFSClient implements FSCons
     return getFileInfo(src) != null;
   }
 
-  /** @deprecated Use getFileStatus() instead */
+  /** @deprecated Use getHdfsFileStatus() instead */
   @Deprecated
   public boolean isDirectory(String src) throws IOException {
-    FileStatus fs = getFileInfo(src);
+    HdfsFileStatus fs = getFileInfo(src);
     if (fs != null)
       return fs.isDir();
     else
@@ -625,7 +625,7 @@ public class DFSClient implements FSCons
 
   /**
    */
-  public FileStatus[] listPaths(String src) throws IOException {
+  public HdfsFileStatus[] listPaths(String src) throws IOException {
     checkOpen();
     try {
       return namenode.getListing(src);
@@ -634,7 +634,7 @@ public class DFSClient implements FSCons
     }
   }
 
-  public FileStatus getFileInfo(String src) throws IOException {
+  public HdfsFileStatus getFileInfo(String src) throws IOException {
     checkOpen();
     try {
       return namenode.getFileInfo(src);
@@ -2818,7 +2818,7 @@ public class DFSClient implements FSCons
      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
      */
     DFSOutputStream(String src, int buffersize, Progressable progress,
-        LocatedBlock lastBlock, FileStatus stat,
+        LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
       this(src, stat.getBlockSize(), progress, bytesPerChecksum);
       initialFileSize = stat.getLen(); // length of file when opened

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java Fri Mar  4 03:51:49 2011
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
 import org.apache.hadoop.conf.Configuration;
 import java.util.StringTokenizer;
 import org.apache.hadoop.fs.Path;
@@ -65,5 +67,29 @@ public class DFSUtil {
     String user = conf.get(userNameKey, System.getProperty("user.name"));
     UserGroupInformation.loginUserFromKeytab(user, keytabFilename);
   }
+  
+  /**
+   * Converts a byte array to a string using UTF8 encoding.
+   */
+  public static String bytes2String(byte[] bytes) {
+    try {
+      return new String(bytes, "UTF8");
+    } catch(UnsupportedEncodingException e) {
+      assert false : "UTF8 encoding is not supported ";
+    }
+    return null;
+  }
+
+  /**
+   * Converts a string to a byte array using UTF8 encoding.
+   */
+  public static byte[] string2Bytes(String str) {
+    try {
+      return str.getBytes("UTF8");
+    } catch(UnsupportedEncodingException e) {
+      assert false : "UTF8 encoding is not supported ";
+    }
+    return null;
+  }
 }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Mar  4 03:51:49 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
@@ -244,20 +245,20 @@ public class DistributedFileSystem exten
     dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
   }
   
-  private FileStatus makeQualified(FileStatus f) {
+  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
     return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
         f.getBlockSize(), f.getModificationTime(),
         f.getAccessTime(),
         f.getPermission(), f.getOwner(), f.getGroup(),
-        f.getPath().makeQualified(this)); // fully-qualify path
+        f.getFullPath(parent).makeQualified(this)); // fully-qualify path
   }
 
   public FileStatus[] listStatus(Path p) throws IOException {
-    FileStatus[] infos = dfs.listPaths(getPathName(p));
+    HdfsFileStatus[] infos = dfs.listPaths(getPathName(p));
     if (infos == null) return null;
     FileStatus[] stats = new FileStatus[infos.length];
     for (int i = 0; i < infos.length; i++) {
-      stats[i] = makeQualified(infos[i]);
+      stats[i] = makeQualified(infos[i], p);
     }
     return stats;
   }
@@ -454,9 +455,9 @@ public class DistributedFileSystem exten
    * @throws FileNotFoundException if the file does not exist.
    */
   public FileStatus getFileStatus(Path f) throws IOException {
-    FileStatus fi = dfs.getFileInfo(getPathName(f));
+    HdfsFileStatus fi = dfs.getFileInfo(getPathName(f));
     if (fi != null) {
-      return makeQualified(fi);
+      return makeQualified(fi, f);
     } else {
       throw new FileNotFoundException("File does not exist: " + f);
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Mar  4 03:51:49 2011
@@ -22,11 +22,11 @@ import java.io.*;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
@@ -49,9 +49,10 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 43: Adding Delegation Token related APIs
+   * 44: getFileInfo returns HDFSFileStatus;
+   *     getListing returns HDFSFileStatus[].
    */
-  public static final long versionID = 43L;
+  public static final long versionID = 44L;
   
   ///////////////////////////////////////
   // File contents
@@ -266,7 +267,7 @@ public interface ClientProtocol extends 
   /**
    * Get a listing of the indicated directory
    */
-  public FileStatus[] getListing(String src) throws IOException;
+  public HdfsFileStatus[] getListing(String src) throws IOException;
 
   ///////////////////////////////////////
   // System issues and management
@@ -434,7 +435,7 @@ public interface ClientProtocol extends 
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  public FileStatus getFileInfo(String src) throws IOException;
+  public HdfsFileStatus getFileInfo(String src) throws IOException;
 
   /**
    * Get {@link ContentSummary} rooted at the specified directory.

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1077206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Fri Mar  4 03:51:49 2011
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/** Interface that represents the over the wire information for a file.
+ */
+public class HdfsFileStatus implements Writable {
+
+  private byte[] path;  // local name of the inode that's encoded in java UTF8
+  private long length;
+  private boolean isdir;
+  private short block_replication;
+  private long blocksize;
+  private long modification_time;
+  private long access_time;
+  private FsPermission permission;
+  private String owner;
+  private String group;
+  
+  public static final byte[] EMPTY_NAME = new byte[0];
+
+  /**
+   * default constructor
+   */
+  public HdfsFileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }
+  
+  /**
+   * Constructor
+   * @param length the number of bytes the file has
+   * @param isdir if the path is a directory
+   * @param block_replication the replication factor
+   * @param blocksize the block size
+   * @param modification_time modification time
+   * @param access_time access time
+   * @param permission permission
+   * @param owner the owner of the path
+   * @param group the group of the path
+   * @param path the local name in java UTF8 encoding the same as that in-memory
+   */
+  public HdfsFileStatus(long length, boolean isdir, int block_replication,
+                    long blocksize, long modification_time, long access_time,
+                    FsPermission permission, String owner, String group, 
+                    byte[] path) {
+    this.length = length;
+    this.isdir = isdir;
+    this.block_replication = (short)block_replication;
+    this.blocksize = blocksize;
+    this.modification_time = modification_time;
+    this.access_time = access_time;
+    this.permission = (permission == null) ? 
+                      FsPermission.getDefault() : permission;
+    this.owner = (owner == null) ? "" : owner;
+    this.group = (group == null) ? "" : group;
+    this.path = path;
+  }
+
+  /**
+   * Get the length of this file, in bytes.
+   * @return the length of this file, in bytes.
+   */
+  final public long getLen() {
+    return length;
+  }
+
+  /**
+   * Is this a directory?
+   * @return true if this is a directory
+   */
+  final public boolean isDir() {
+    return isdir;
+  }
+
+  /**
+   * Get the block size of the file.
+   * @return the number of bytes
+   */
+  final public long getBlockSize() {
+    return blocksize;
+  }
+
+  /**
+   * Get the replication factor of a file.
+   * @return the replication factor of a file.
+   */
+  final public short getReplication() {
+    return block_replication;
+  }
+
+  /**
+   * Get the modification time of the file.
+   * @return the modification time of file in milliseconds since January 1, 1970 UTC.
+   */
+  final public long getModificationTime() {
+    return modification_time;
+  }
+
+  /**
+   * Get the access time of the file.
+   * @return the access time of file in milliseconds since January 1, 1970 UTC.
+   */
+  final public long getAccessTime() {
+    return access_time;
+  }
+
+  /**
+   * Get FsPermission associated with the file.
+   * @return permssion
+   */
+  final public FsPermission getPermission() {
+    return permission;
+  }
+  
+  /**
+   * Get the owner of the file.
+   * @return owner of the file
+   */
+  final public String getOwner() {
+    return owner;
+  }
+  
+  /**
+   * Get the group associated with the file.
+   * @return group for the file. 
+   */
+  final public String getGroup() {
+    return group;
+  }
+  
+  /**
+   * Check if the local name is empty
+   * @return true if the name is empty
+   */
+  final public boolean isEmptyLocalName() {
+    return path.length == 0;
+  }
+
+  /**
+   * Get the string representation of the local name
+   * @return the local name in string
+   */
+  final public String getLocalName() {
+    return DFSUtil.bytes2String(path);
+  }
+  
+  /**
+   * Get the string representation of the full path name
+   * @param parent the parent path
+   * @return the full path in string
+   */
+  final public String getFullName(final String parent) {
+    if (isEmptyLocalName()) {
+      return parent;
+    }
+    
+    StringBuilder fullName = new StringBuilder(parent);
+    if (!parent.endsWith(Path.SEPARATOR)) {
+      fullName.append(Path.SEPARATOR);
+    }
+    fullName.append(getLocalName());
+    return fullName.toString();
+  }
+
+  /**
+   * Get the full path
+   * @param parent the parent path
+   * @return the full path
+   */
+  final public Path getFullPath(final Path parent) {
+    if (isEmptyLocalName()) {
+      return parent;
+    }
+    
+    return new Path(parent, getLocalName());
+  }
+
+  //////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(path.length);
+    out.write(path);
+    out.writeLong(length);
+    out.writeBoolean(isdir);
+    out.writeShort(block_replication);
+    out.writeLong(blocksize);
+    out.writeLong(modification_time);
+    out.writeLong(access_time);
+    permission.write(out);
+    Text.writeString(out, owner);
+    Text.writeString(out, group);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int numOfBytes = in.readInt();
+    if (numOfBytes == 0) {
+      this.path = EMPTY_NAME;
+    } else {
+      this.path = new byte[numOfBytes];
+      in.readFully(path);
+    }
+    this.length = in.readLong();
+    this.isdir = in.readBoolean();
+    this.block_replication = in.readShort();
+    blocksize = in.readLong();
+    modification_time = in.readLong();
+    access_time = in.readLong();
+    permission.readFields(in);
+    owner = Text.readString(in);
+    group = Text.readString(in);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Mar  4 03:51:49 2011
@@ -21,7 +21,6 @@ import java.io.*;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.permission.*;
@@ -30,6 +29,7 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
@@ -706,7 +706,7 @@ class FSDirectory implements FSConstants
    * This function is admittedly very inefficient right now.  We'll
    * make it better later.
    */
-  FileStatus[] getListing(String src) {
+  HdfsFileStatus[] getListing(String src) {
     String srcs = normalizePath(src);
 
     synchronized (rootDir) {
@@ -714,15 +714,14 @@ class FSDirectory implements FSConstants
       if (targetNode == null)
         return null;
       if (!targetNode.isDirectory()) {
-        return new FileStatus[]{createFileStatus(srcs, targetNode)};
+        return new HdfsFileStatus[]{createFileStatus(
+            HdfsFileStatus.EMPTY_NAME, targetNode)};
       }
       List<INode> contents = ((INodeDirectory)targetNode).getChildren();
-      FileStatus listing[] = new FileStatus[contents.size()];
-      if(! srcs.endsWith(Path.SEPARATOR))
-        srcs += Path.SEPARATOR;
+      HdfsFileStatus listing[] = new HdfsFileStatus[contents.size()];
       int i = 0;
       for (INode cur : contents) {
-        listing[i] = createFileStatus(srcs+cur.getLocalName(), cur);
+        listing[i] = createFileStatus(cur.name, cur);
         i++;
       }
       return listing;
@@ -734,7 +733,7 @@ class FSDirectory implements FSConstants
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  FileStatus getFileInfo(String src) {
+  HdfsFileStatus getFileInfo(String src) {
     String srcs = normalizePath(src);
     synchronized (rootDir) {
       INode targetNode = rootDir.getNode(srcs);
@@ -742,7 +741,7 @@ class FSDirectory implements FSConstants
         return null;
       }
       else {
-        return createFileStatus(srcs, targetNode);
+        return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
       }
     }
   }
@@ -1351,9 +1350,10 @@ class FSDirectory implements FSConstants
   /**
    * Create FileStatus by file INode 
    */
-   private static FileStatus createFileStatus(String path, INode node) {
+   private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
     // length is zero for directories
-    return new FileStatus(node.isDirectory() ? 0 : node.computeContentSummary().getLength(), 
+    return new HdfsFileStatus(
+        node.isDirectory() ? 0 : ((INodeFile)node).computeContentSummary().getLength(), 
         node.isDirectory(), 
         node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
         node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
@@ -1362,6 +1362,6 @@ class FSDirectory implements FSConstants
         node.getFsPermission(),
         node.getUserName(),
         node.getGroupName(),
-        new Path(path));
+        path);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Mar  4 03:51:49 2011
@@ -36,12 +36,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.*;
 
 /**
@@ -654,7 +654,7 @@ public class FSEditLog {
           String s = FSImage.readString(in);
           String d = FSImage.readString(in);
           timestamp = readLong(in);
-          FileStatus dinfo = fsDir.getFileInfo(d);
+          HdfsFileStatus dinfo = fsDir.getFileInfo(d);
           fsDir.unprotectedRenameTo(s, d, timestamp);
           fsNamesys.changeLease(s, d, dinfo);
           break;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Mar  4 03:51:49 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -133,7 +134,7 @@ public class FSImage extends Storage {
    * Used for saving the image to disk
    */
   static private final FsPermission FILE_PERM = new FsPermission((short)0);
-  static private final byte[] PATH_SEPARATOR = INode.string2Bytes(Path.SEPARATOR);
+  static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
 
   /**
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077206&r1=1077205&r2=1077206&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 03:51:49 2011
@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.Server;
@@ -109,7 +108,7 @@ public class FSNamesystem implements FSC
 
   private static final void logAuditEvent(UserGroupInformation ugi,
       InetAddress addr, String cmd, String src, String dst,
-      FileStatus stat) {
+      HdfsFileStatus stat) {
     final Formatter fmt = auditFormatter.get();
     ((StringBuilder)fmt.out()).setLength(0);
     auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
@@ -751,7 +750,7 @@ public class FSNamesystem implements FSC
     dir.setPermission(src, permission);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "setPermission", src, null, stat);
@@ -777,7 +776,7 @@ public class FSNamesystem implements FSC
     dir.setOwner(src, username, group);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "setOwner", src, null, stat);
@@ -937,7 +936,7 @@ public class FSNamesystem implements FSC
     if (inode != null) {
       dir.setTimes(src, inode, mtime, atime, true);
       if (auditLog.isInfoEnabled()) {
-        final FileStatus stat = dir.getFileInfo(src);
+        final HdfsFileStatus stat = dir.getFileInfo(src);
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
                       "setTimes", src, null, stat);
@@ -1052,7 +1051,7 @@ public class FSNamesystem implements FSC
                       replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "create", src, null, stat);
@@ -1714,7 +1713,7 @@ public class FSNamesystem implements FSC
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(dst);
+      final HdfsFileStatus stat = dir.getFileInfo(dst);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "rename", src, dst, stat);
@@ -1740,7 +1739,7 @@ public class FSNamesystem implements FSC
       checkAncestorAccess(actualdst, FsAction.WRITE);
     }
 
-    FileStatus dinfo = dir.getFileInfo(dst);
+    HdfsFileStatus dinfo = dir.getFileInfo(dst);
     if (dir.renameTo(src, dst)) {
       changeLease(src, dst, dinfo);     // update lease with new filename
       return true;
@@ -1799,7 +1798,7 @@ public class FSNamesystem implements FSC
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  FileStatus getFileInfo(String src) throws IOException {
+  HdfsFileStatus getFileInfo(String src) throws IOException {
     if (isPermissionEnabled) {
       checkTraverse(src);
     }
@@ -1814,7 +1813,7 @@ public class FSNamesystem implements FSC
     boolean status = mkdirsInternal(src, permissions);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "mkdirs", src, null, stat);
@@ -2060,7 +2059,7 @@ public class FSNamesystem implements FSC
    * Get a listing of all files at 'src'.  The Object[] array
    * exists so we can return file attributes (soon to be implemented)
    */
-  public FileStatus[] getListing(String src) throws IOException {
+  public HdfsFileStatus[] getListing(String src) throws IOException {
     if (isPermissionEnabled) {
       if (dir.isDir(src)) {
         checkPathAccess(src, FsAction.READ_EXECUTE);
@@ -4841,7 +4840,7 @@ public class FSNamesystem implements FSC
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   //
-  void changeLease(String src, String dst, FileStatus dinfo) 
+  void changeLease(String src, String dst, HdfsFileStatus dinfo) 
                    throws IOException {
     String overwrite;
     String replaceBy;