You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/03/19 22:08:44 UTC

svn commit: r925420 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ ...

Author: hairong
Date: Fri Mar 19 21:08:44 2010
New Revision: 925420

URL: http://svn.apache.org/viewvc?rev=925420&view=rev
Log:
HDFS-985. HDFS should issue multiple RPCs for listing a large directory. Contributed by Hairong Kuang.

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DirectoryListing.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestFiListPath.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Mar 19 21:08:44 2010
@@ -100,6 +100,9 @@ Trunk (unreleased changes)
     HDFS-826. The DFSOutputStream has a API that returns the number of
     active datanode(s) in the current pipeline. (dhruba)
 
+    HDFS-985. HDFS should issue multiple RPCs for listing a large
+    directory. (hairong)
+
   OPTIMIZATIONS
 
     HDFS-946. NameNode should not return full path name when lisitng a

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/fs/Hdfs.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/fs/Hdfs.java Fri Mar 19 21:08:44 2010
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -147,15 +149,51 @@ public class Hdfs extends AbstractFileSy
   @Override
   protected FileStatus[] listStatus(Path f) 
       throws IOException, UnresolvedLinkException {
-    HdfsFileStatus[] infos = dfs.listPaths(getUriPath(f));
-    if (infos == null)
+    String src = getUriPath(f);
+
+    // fetch the first batch of entries in the directory
+    DirectoryListing thisListing = dfs.listPaths(
+        src, HdfsFileStatus.EMPTY_NAME);
+
+    if (thisListing == null) { // the directory does not exist
       throw new FileNotFoundException("File " + f + " does not exist.");
+    }
+    
+    HdfsFileStatus[] partialListing = thisListing.getPartialListing();
+    if (!thisListing.hasMore()) { // got all entries of the directory
+      FileStatus[] stats = new FileStatus[partialListing.length];
+      for (int i = 0; i < partialListing.length; i++) {
+        stats[i] = makeQualified(partialListing[i], f);
+      }
+      return stats;
+    }
 
-    FileStatus [] stats = new FileStatus[infos.length]; 
-    for (int i = 0; i < infos.length; i++) {
-      stats[i] = makeQualified(infos[i], f);
+    // The directory size is too big that it needs to fetch more
+    // estimate the total number of entries in the directory
+    int totalNumEntries =
+      partialListing.length + thisListing.getRemainingEntries();
+    ArrayList<FileStatus> listing =
+      new ArrayList<FileStatus>(totalNumEntries);
+    // add the first batch of entries to the array list
+    for (HdfsFileStatus fileStatus : partialListing) {
+      listing.add(makeQualified(fileStatus, f));
     }
-    return stats;
+ 
+    // now fetch more entries
+    do {
+      thisListing = dfs.listPaths(src, thisListing.getLastName());
+ 
+      if (thisListing == null) {
+        break; // the directory is deleted
+      }
+ 
+      partialListing = thisListing.getPartialListing();
+      for (HdfsFileStatus fileStatus : partialListing) {
+        listing.add(makeQualified(fileStatus, f));
+      }
+    } while (thisListing.hasMore());
+ 
+    return listing.toArray(new FileStatus[listing.size()]);
   }
 
   @Override

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 19 21:08:44 2010
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -818,11 +819,22 @@ public class DFSClient implements FSCons
     return getFileInfo(src) != null;
   }
 
-  public HdfsFileStatus[] listPaths(String src) 
+  /**
+   * Get a partial listing of the indicated directory
+   *
+   * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter
+   * if the application wants to fetch a listing starting from
+   * the first entry in the directory
+   *
+   * @param src the directory name
+   * @param startAfter the name to start listing after encoded in java UTF8
+   * @return a partial listing starting after startAfter
+   */
+  public DirectoryListing listPaths(String src,  byte[] startAfter) 
     throws IOException, UnresolvedLinkException {
     checkOpen();
     try {
-      return namenode.getListing(src);
+      return namenode.getListing(src, startAfter);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      UnresolvedPathException.class);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Mar 19 21:08:44 2010
@@ -87,6 +87,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml";
   public static final String  DFS_NAMENODE_NAME_DIR_RESTORE_KEY = "dfs.namenode.name.dir.restore";
   public static final boolean DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT = false;
+  public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
+  public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
 
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Mar 19 21:08:44 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.*;
 import java.net.*;
+import java.util.ArrayList;
 import java.util.EnumSet;
 
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -313,17 +315,61 @@ public class DistributedFileSystem exten
             getUri(), getWorkingDirectory())); // fully-qualify path
   }
 
+  /**
+   * List all the entries of a directory
+   *
+   * Note that this operation is not atomic for a large directory.
+   * The entries of a directory may be fetched from NameNode multiple times.
+   * It only guarantees that  each name occurs once if a directory
+   * undergoes changes between the calls.
+   */
   @Override
   public FileStatus[] listStatus(Path p) throws IOException {
-    HdfsFileStatus[] infos = dfs.listPaths(getPathName(p));
-    if (infos == null) 
+    String src = getPathName(p);
+
+    // fetch the first batch of entries in the directory
+    DirectoryListing thisListing = dfs.listPaths(
+        src, HdfsFileStatus.EMPTY_NAME);
+
+    if (thisListing == null) { // the directory does not exist
       throw new FileNotFoundException("File " + p + " does not exist.");
-    
-    FileStatus[] stats = new FileStatus[infos.length];
-    for (int i = 0; i < infos.length; i++) {
-      stats[i] = makeQualified(infos[i], p);
     }
-    return stats;
+    
+    HdfsFileStatus[] partialListing = thisListing.getPartialListing();
+    if (!thisListing.hasMore()) { // got all entries of the directory
+      FileStatus[] stats = new FileStatus[partialListing.length];
+      for (int i = 0; i < partialListing.length; i++) {
+        stats[i] = makeQualified(partialListing[i], p);
+      }
+      return stats;
+    }
+
+    // The directory size is too big that it needs to fetch more
+    // estimate the total number of entries in the directory
+    int totalNumEntries =
+      partialListing.length + thisListing.getRemainingEntries();
+    ArrayList<FileStatus> listing =
+      new ArrayList<FileStatus>(totalNumEntries);
+    // add the first batch of entries to the array list
+    for (HdfsFileStatus fileStatus : partialListing) {
+      listing.add(makeQualified(fileStatus, p));
+    }
+ 
+    // now fetch more entries
+    do {
+      thisListing = dfs.listPaths(src, thisListing.getLastName());
+ 
+      if (thisListing == null) { // the directory is deleted
+        throw new FileNotFoundException("File " + p + " does not exist.");
+      }
+ 
+      partialListing = thisListing.getPartialListing();
+      for (HdfsFileStatus fileStatus : partialListing) {
+        listing.add(makeQualified(fileStatus, p));
+      }
+    } while (thisListing.hasMore());
+ 
+    return listing.toArray(new FileStatus[listing.size()]);
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Mar 19 21:08:44 2010
@@ -55,9 +55,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 59: Add API to retrive corrupted block list
+   * 60: Replace full getListing with iterative getListinng.
    */
-  public static final long versionID = 59L;
+  public static final long versionID = 60L;
   
   ///////////////////////////////////////
   // File contents
@@ -353,10 +353,14 @@ public interface ClientProtocol extends 
       throws IOException, UnresolvedLinkException;
 
   /**
-   * Get a listing of the indicated directory.
+   * Get a partial listing of the indicated directory
+   *
+   * @param src the directory name
+   * @param startAfter the name to start listing after encoded in java UTF8
+   * @return a partial listing starting after startAfter
    * @throws UnresolvedLinkException if the path contains a symlink. 
    */
-  public HdfsFileStatus[] getListing(String src) 
+  public DirectoryListing getListing(String src, byte[] startAfter) 
       throws IOException, UnresolvedLinkException; 
 
   ///////////////////////////////////////

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DirectoryListing.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DirectoryListing.java?rev=925420&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DirectoryListing.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DirectoryListing.java Fri Mar 19 21:08:44 2010
@@ -0,0 +1,123 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * This class defines a partial listing of a directory to support
+ * iterative directory listing.
+ */
+public class DirectoryListing implements Writable {
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (DirectoryListing.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new DirectoryListing(); }
+       });
+  }
+
+  private HdfsFileStatus[] partialListing;
+  private int remainingEntries;
+  
+  /**
+   * default constructor
+   */
+  public DirectoryListing() {
+  }
+  
+  /**
+   * constructor
+   * @param partialListing a partial listing of a directory
+   * @param remainingEntries number of entries that are left to be listed
+   */
+  public DirectoryListing(HdfsFileStatus[] partialListing, 
+      int remainingEntries) {
+    if (partialListing == null) {
+      throw new IllegalArgumentException("partial listing should not be null");
+    }
+    if (partialListing.length == 0 && remainingEntries != 0) {
+      throw new IllegalArgumentException("Partial listing is empty but " +
+          "the number of remaining entries is not zero");
+    }
+    this.partialListing = partialListing;
+    this.remainingEntries = remainingEntries;
+  }
+
+  /**
+   * Get the partial listing of file status
+   * @return the partial listing of file status
+   */
+  public HdfsFileStatus[] getPartialListing() {
+    return partialListing;
+  }
+  
+  /**
+   * Get the number of remaining entries that are left to be listed
+   * @return the number of remaining entries that are left to be listed
+   */
+  public int getRemainingEntries() {
+    return remainingEntries;
+  }
+  
+  /**
+   * Check if there are more entries that are left to be listed
+   * @return true if there are more entries that are left to be listed;
+   *         return false otherwise.
+   */
+  public boolean hasMore() {
+    return remainingEntries != 0;
+  }
+  
+  /**
+   * Get the last name in this list
+   * @return the last name in the list if it is not empty; otherwise return null
+   */
+  public byte[] getLastName() {
+    if (partialListing.length == 0) {
+      return null;
+    }
+    return partialListing[partialListing.length-1].getLocalNameInBytes();
+  }
+
+  // Writable interface
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEntries = in.readInt();
+    partialListing = new HdfsFileStatus[numEntries];
+    for (int i=0; i<numEntries; i++) {
+      partialListing[i] = new HdfsFileStatus();
+      partialListing[i].readFields(in);
+    }
+    remainingEntries = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(partialListing.length);
+    for (HdfsFileStatus fileStatus : partialListing) {
+      fileStatus.write(out);
+    }
+    out.writeInt(remainingEntries);
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Fri Mar 19 21:08:44 2010
@@ -26,10 +26,19 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 
 /** Interface that represents the over the wire information for a file.
  */
 public class HdfsFileStatus implements Writable {
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (HdfsFileStatus.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new HdfsFileStatus(); }
+       });
+  }
 
   private byte[] path;  // local name of the inode that's encoded in java UTF8
   private byte[] symlink; // symlink target encoded in java UTF8
@@ -180,6 +189,14 @@ public class HdfsFileStatus implements W
   }
   
   /**
+   * Get the Java UTF8 representation of the local name
+   * @return the local name in java UTF8
+   */
+  final public byte[] getLocalNameInBytes() {
+    return path;
+  }
+
+  /**
    * Get the string representation of the full path name
    * @param parent the parent path
    * @return the full path in string

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Fri Mar 19 21:08:44 2010
@@ -32,11 +32,11 @@ import javax.servlet.jsp.JspWriter;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
@@ -128,7 +128,6 @@ public class DatanodeJspHelper {
         return;
       }
       // directory
-      HdfsFileStatus[] files = dfs.listPaths(target);
       // generate a table and dump the info
       String[] headings = { "Name", "Type", "Size", "Replication",
           "Block Size", "Modification Time", "Permission", "Owner", "Group" };
@@ -146,42 +145,49 @@ public class DatanodeJspHelper {
             + JspHelper.SET_DELEGATION + tokenString
             + "\">Go to parent directory</a><br>");
 
-      if (files == null || files.length == 0) {
+      DirectoryListing thisListing = 
+        dfs.listPaths(target, HdfsFileStatus.EMPTY_NAME);
+      if (thisListing == null || thisListing.getPartialListing().length == 0) {
         out.print("Empty directory");
       } else {
         JspHelper.addTableHeader(out);
         int row = 0;
         JspHelper.addTableRow(out, headings, row++);
         String cols[] = new String[headings.length];
-        for (int i = 0; i < files.length; i++) {
-          String localFileName = files[i].getLocalName();
-          // Get the location of the first block of the file
-          if (localFileName.endsWith(".crc"))
-            continue;
-          if (!files[i].isDir()) {
-            cols[1] = "file";
-            cols[2] = StringUtils.byteDesc(files[i].getLen());
-            cols[3] = Short.toString(files[i].getReplication());
-            cols[4] = StringUtils.byteDesc(files[i].getBlockSize());
-          } else {
-            cols[1] = "dir";
-            cols[2] = "";
-            cols[3] = "";
-            cols[4] = "";
-          }
-          String datanodeUrl = req.getRequestURL() + "?dir="
+        do {
+          HdfsFileStatus[] files = thisListing.getPartialListing();
+          for (int i = 0; i < files.length; i++) {
+            String localFileName = files[i].getLocalName();
+            // Get the location of the first block of the file
+            if (!files[i].isDir()) {
+              cols[1] = "file";
+              cols[2] = StringUtils.byteDesc(files[i].getLen());
+              cols[3] = Short.toString(files[i].getReplication());
+              cols[4] = StringUtils.byteDesc(files[i].getBlockSize());
+            } else {
+              cols[1] = "dir";
+              cols[2] = "";
+              cols[3] = "";
+              cols[4] = "";
+            }
+            String datanodeUrl = req.getRequestURL() + "?dir="
               + URLEncoder.encode(files[i].getFullName(target), "UTF-8")
               + "&namenodeInfoPort=" + namenodeInfoPort
               + JspHelper.SET_DELEGATION + tokenString;
-          cols[0] = "<a href=\"" + datanodeUrl + "\">"
+            cols[0] = "<a href=\"" + datanodeUrl + "\">"
               + localFileName + "</a>";
-          cols[5] = FsShell.dateForm.format(new Date((files[i]
+            cols[5] = FsShell.dateForm.format(new Date((files[i]
               .getModificationTime())));
-          cols[6] = files[i].getPermission().toString();
-          cols[7] = files[i].getOwner();
-          cols[8] = files[i].getGroup();
-          JspHelper.addTableRow(out, cols, row++);
-        }
+            cols[6] = files[i].getPermission().toString();
+            cols[7] = files[i].getOwner();
+            cols[8] = files[i].getGroup();
+            JspHelper.addTableRow(out, cols, row++);
+          }
+          if (!thisListing.hasMore()) {
+            break;
+          }
+          thisListing = dfs.listPaths(target, thisListing.getLastName());
+        } while (thisListing != null);
         JspHelper.addTableFooter(out);
       }
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Mar 19 21:08:44 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -65,6 +66,7 @@ class FSDirectory implements Closeable {
   private volatile boolean ready = false;
   private MetricsRecord directoryMetrics = null;
   private static final long UNKNOWN_DISK_SPACE = -1;
+  private final int lsLimit;  // max list limit
   
   /** Access an existing dfs name directory. */
   FSDirectory(FSNamesystem ns, Configuration conf) {
@@ -84,6 +86,10 @@ class FSDirectory implements Closeable {
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
         Integer.MAX_VALUE, UNKNOWN_DISK_SPACE);
     this.fsImage = fsImage;
+    int configuredLimit = conf.getInt(
+        DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    this.lsLimit = configuredLimit>0 ?
+        configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
     initialize(conf);
   }
     
@@ -1048,30 +1054,37 @@ class FSDirectory implements Closeable {
   }
 
   /**
-   * Get a listing of files given path 'src'
+   * Get a partial listing of the indicated directory
    *
-   * This function is admittedly very inefficient right now.  We'll
-   * make it better later.
+   * @param src the directory name
+   * @param startAfter the name to start listing after
+   * @return a partial listing starting after startAfter
    */
-  HdfsFileStatus[] getListing(String src) throws UnresolvedLinkException {
+  DirectoryListing getListing(String src, byte[] startAfter)
+  throws UnresolvedLinkException {
     String srcs = normalizePath(src);
 
     synchronized (rootDir) {
       INode targetNode = rootDir.getNode(srcs, true);
       if (targetNode == null)
         return null;
+      
       if (!targetNode.isDirectory()) {
-        return new HdfsFileStatus[]{createFileStatus(
-            HdfsFileStatus.EMPTY_NAME, targetNode)};
+        return new DirectoryListing(new HdfsFileStatus[]{createFileStatus(
+            HdfsFileStatus.EMPTY_NAME, targetNode)}, 0);
       }
-      List<INode> contents = ((INodeDirectory)targetNode).getChildren();
-      HdfsFileStatus listing[] = new HdfsFileStatus[contents.size()];
-      int i = 0;
-      for (INode cur : contents) {
+      INodeDirectory dirInode = (INodeDirectory)targetNode;
+      List<INode> contents = dirInode.getChildren();
+      int startChild = dirInode.nextChild(startAfter);
+      int totalNumChildren = contents.size();
+      int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
+      HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+      for (int i=0; i<numOfListing; i++) {
+        INode cur = contents.get(startChild+i);
         listing[i] = createFileStatus(cur.name, cur);
-        i++;
       }
-      return listing;
+      return new DirectoryListing(
+          listing, totalNumChildren-startChild-numOfListing);
     }
   }
 
@@ -1479,6 +1492,9 @@ class FSDirectory implements Closeable {
     }
     updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
         checkQuota);
+    if (pathComponents[pos-1] == null) {
+      throw new NullPointerException("Panic: parent does not exist");
+    }
     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
         child, inheritPermission);
     if (addedNode == null) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar 19 21:08:44 2010
@@ -2208,10 +2208,13 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * Get a listing of all files at 'src'.  The Object[] array
-   * exists so we can return file attributes (soon to be implemented)
+   * Get a partial listing of the indicated directory
+   *
+   * @param src the directory name
+   * @param startAfter the name to start after
+   * @return a partial listing starting after startAfter
    */
-  public HdfsFileStatus[] getListing(String src) 
+  public DirectoryListing getListing(String src, byte[] startAfter) 
     throws IOException, UnresolvedLinkException {
     if (isPermissionEnabled) {
       if (dir.isDir(src)) {
@@ -2226,7 +2229,7 @@ public class FSNamesystem implements FSC
                     Server.getRemoteIp(),
                     "listStatus", src, null, null);
     }
-    return dir.getListing(src);
+    return dir.getListing(src, startAfter);
   }
 
   /////////////////////////////////////////////////////////

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Mar 19 21:08:44 2010
@@ -239,6 +239,23 @@ class INodeDirectory extends INode {
   }
 
   /**
+   * Given a child's name, return the index of the next child
+   *
+   * @param name a child's name
+   * @return the index of the next child
+   */
+  int nextChild(byte[] name) {
+    if (name.length == 0) { // empty name
+      return 0;
+    }
+    int nextPos = Collections.binarySearch(children, name) + 1;
+    if (nextPos >= 0) {
+      return nextPos;
+    }
+    return -nextPos;
+  }
+
+  /**
    * Add a child inode to the directory.
    * 
    * @param node INode to insert

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java Fri Mar 19 21:08:44 2010
@@ -18,12 +18,11 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -38,7 +37,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -164,24 +162,32 @@ public class ListPathsServlet extends Df
       while (!pathstack.empty()) {
         String p = pathstack.pop();
         try {
-          HdfsFileStatus[] listing = nnproxy.getListing(p);
-          if (listing == null) {
-            LOG.warn("ListPathsServlet - Path " + p + " does not exist");
-            continue;
-          }
-          for (HdfsFileStatus i : listing) {
-            String localName = i.getLocalName();
-            if (exclude.matcher(localName).matches()
-                || !filter.matcher(localName).matches()) {
-              continue;
+          byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
+          DirectoryListing thisListing;
+          do {
+            assert lastReturnedName != null;
+            thisListing = nnproxy.getListing(p, lastReturnedName);
+            if (thisListing == null) {
+              if (lastReturnedName.length == 0) {
+                LOG.warn("ListPathsServlet - Path " + p + " does not exist");
+              }
+              break;
             }
-            if (recur && i.isDir()) {
-              pathstack.push(new Path(p, localName).toUri().getPath());
+            HdfsFileStatus[] listing = thisListing.getPartialListing();
+            for (HdfsFileStatus i : listing) {
+              String localName = i.getLocalName();
+              if (exclude.matcher(localName).matches()
+                  || !filter.matcher(localName).matches()) {
+                continue;
+              }
+              if (recur && i.isDir()) {
+                pathstack.push(new Path(p, localName).toUri().getPath());
+              }
+              writeInfo(p, i, doc);
             }
-            writeInfo(p, i, doc);
-          }
-        }
-        catch(RemoteException re) {re.writeXml(p, doc);}
+            lastReturnedName = thisListing.getLastName();
+          } while (thisListing.hasMore());
+        } catch(RemoteException re) {re.writeXml(p, doc);}
       }
       if (doc != null) {
         doc.endDocument();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar 19 21:08:44 2010
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -842,11 +843,13 @@ public class NameNode implements ClientP
 
   /**
    */
-  public HdfsFileStatus[] getListing(String src) throws IOException {
-    HdfsFileStatus[] files = namesystem.getListing(src);
+  @Override
+  public DirectoryListing getListing(String src, byte[] startAfter)
+  throws IOException {
+    DirectoryListing files = namesystem.getListing(src, startAfter);
     if (files != null) {
       myMetrics.numGetListingOps.inc();
-      myMetrics.numFilesInGetListingOps.inc(files.length);
+      myMetrics.numFilesInGetListingOps.inc(files.getPartialListing().length);
     }
     return files;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Mar 19 21:08:44 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -141,11 +142,9 @@ public class NamenodeFsck {
     try {
       Result res = new Result(conf);
 
-      final HdfsFileStatus[] files = namenode.getListing(path);
-      if (files != null) {
-        for (int i = 0; i < files.length; i++) {
-          check(path, files[i], res);
-        }
+      final HdfsFileStatus file = namenode.getFileInfo(path);
+      if (file != null) {
+        check(path, file, res);
         out.println(res);
         out.println(" Number of data-nodes:\t\t" + totalDatanodes);
         out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
@@ -176,17 +175,24 @@ public class NamenodeFsck {
     boolean isOpen = false;
 
     if (file.isDir()) {
-      final HdfsFileStatus[] files = namenode.getListing(path);
-      if (files == null) {
-        return;
-      }
+      byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
+      DirectoryListing thisListing;
       if (showFiles) {
         out.println(path + " <dir>");
       }
       res.totalDirs++;
-      for (int i = 0; i < files.length; i++) {
-        check(path, files[i], res);
-      }
+      do {
+        assert lastReturnedName != null;
+        thisListing = namenode.getListing(path, lastReturnedName);
+        if (thisListing == null) {
+          return;
+        }
+        HdfsFileStatus[] files = thisListing.getPartialListing();
+        for (int i = 0; i < files.length; i++) {
+          check(path, files[i], res);
+        }
+        lastReturnedName = thisListing.getLastName();
+      } while (thisListing.hasMore());
       return;
     }
     long fileLen = file.getLen();

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestFiListPath.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestFiListPath.java?rev=925420&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestFiListPath.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fs/TestFiListPath.java Fri Mar 19 21:08:44 2010
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A large directory listing may have to go through multiple RPCs.
+ * The directory to be listed may be removed before all contents are listed.
+ * 
+ * This test uses AspectJ to simulate the scenario.
+ */
+public class TestFiListPath {
+  private static final Log LOG = LogFactory.getLog(TestFiListPath.class);
+  private static final int LIST_LIMIT = 1;
+  
+  private MiniDFSCluster cluster = null;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, LIST_LIMIT);
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitClusterUp();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /** Remove the target directory after the getListing RPC */
+  @Test
+  public void testTargetDeletion() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    Path parent = new Path("/tmp");
+    fs.mkdirs(parent);
+    for (int i=0; i<LIST_LIMIT+1; i++) {
+      fs.mkdirs(new Path(parent, "dir"+i));
+    }
+    try {
+      fs.listStatus(parent);
+      fail("Test should fail with FileNotFoundException");
+    } catch (FileNotFoundException e) {
+      assertEquals("File " + parent + " does not exist.", e.getMessage());
+      LOG.info(StringUtils.stringifyException(e));
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj?rev=925420&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj Fri Mar 19 21:08:44 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+
+/**
+ * The aspects here are used for testing HDFS implementation of iterative
+ * directory listing functionality. A directory is deleted right after
+ * the first listPath RPC. 
+ */
+public privileged aspect ListPathAspects {
+  public static final Log LOG = LogFactory.getLog(ListPathAspects.class);
+
+  /** When removeChild is called during rename, throw exception */
+  pointcut callGetListing(FSNamesystem fd, String src, byte[] startAfter) : 
+    call(DirectoryListing FSNamesystem.getListing(String, byte[]))
+    && target(fd)
+    && args(src, startAfter);
+
+  after(FSNamesystem fd, String src, byte[] startAfter) 
+    throws IOException, UnresolvedLinkException: callGetListing(fd, src, startAfter) {
+    LOG.info("FI: callGetListing");
+    fd.delete(src, true);
+  }
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Mar 19 21:08:44 2010
@@ -224,7 +224,7 @@ public class TestDFSClientRetries extend
 
     public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { return false; }
 
-    public HdfsFileStatus[] getListing(String src) throws IOException { return null; }
+    public DirectoryListing getListing(String src, byte[] startName) throws IOException { return null; }
 
     public void renewLease(String clientName) throws IOException {}
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java Fri Mar 19 21:08:44 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 
 import junit.framework.TestCase;
 import java.io.*;
-import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.TreeMap;
@@ -28,9 +27,10 @@ import java.util.zip.CRC32;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -127,43 +127,43 @@ public class TestDFSUpgradeFromImage ext
   
   CRC32 overallChecksum = new CRC32();
   
-  private void verifyDir(DFSClient client, String dir) 
+  private void verifyDir(DistributedFileSystem dfs, Path dir) 
                                            throws IOException {
     
-    HdfsFileStatus[] fileArr = client.listPaths(dir);
-    TreeMap<String, Boolean> fileMap = new TreeMap<String, Boolean>();
+    FileStatus[] fileArr = dfs.listStatus(dir);
+    TreeMap<Path, Boolean> fileMap = new TreeMap<Path, Boolean>();
     
-    for(HdfsFileStatus file : fileArr) {
-      String path = file.getFullName(dir);
-      fileMap.put(path, Boolean.valueOf(file.isDir()));
+    for(FileStatus file : fileArr) {
+      fileMap.put(file.getPath(), Boolean.valueOf(file.isDir()));
     }
     
-    for(Iterator<String> it = fileMap.keySet().iterator(); it.hasNext();) {
-      String path = it.next();
+    for(Iterator<Path> it = fileMap.keySet().iterator(); it.hasNext();) {
+      Path path = it.next();
       boolean isDir = fileMap.get(path);
       
-      overallChecksum.update(path.getBytes());
+      String pathName = path.toUri().getPath();
+      overallChecksum.update(pathName.getBytes());
       
       if ( isDir ) {
-        verifyDir(client, path);
+        verifyDir(dfs, path);
       } else {
         // this is not a directory. Checksum the file data.
         CRC32 fileCRC = new CRC32();
-        FSInputStream in = client.open(path);
+        FSInputStream in = dfs.dfs.open(pathName);
         byte[] buf = new byte[4096];
         int nRead = 0;
         while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
           fileCRC.update(buf, 0, nRead);
         }
         
-        verifyChecksum(path, fileCRC.getValue());
+        verifyChecksum(pathName, fileCRC.getValue());
       }
     }
   }
   
-  private void verifyFileSystem(DFSClient client) throws IOException {
+  private void verifyFileSystem(DistributedFileSystem dfs) throws IOException {
   
-    verifyDir(client, "/");
+    verifyDir(dfs, new Path("/"));
     
     verifyChecksum("overallCRC", overallChecksum.getValue());
     
@@ -185,8 +185,8 @@ public class TestDFSUpgradeFromImage ext
       cluster = new MiniDFSCluster(0, conf, numDataNodes, false, true,
                                    StartupOption.UPGRADE, null);
       cluster.waitActive();
-      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
-                                           cluster.getNameNodePort()), conf);
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      DFSClient dfsClient = dfs.dfs;
       //Safemode will be off only after upgrade is complete. Wait for it.
       while ( dfsClient.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET) ) {
         LOG.info("Waiting for SafeMode to be OFF.");
@@ -195,7 +195,7 @@ public class TestDFSUpgradeFromImage ext
         } catch (InterruptedException ignored) {}
       }
 
-      verifyFileSystem(dfsClient);
+      verifyFileSystem(dfs);
     } finally {
       if (cluster != null) { cluster.shutdown(); }
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java Fri Mar 19 21:08:44 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Random;
 
@@ -72,6 +73,7 @@ public class TestFileStatus extends Test
    */
   public void testFileStatus() throws IOException {
     Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     final HftpFileSystem hftpfs = cluster.getHftpFileSystem();
@@ -83,8 +85,6 @@ public class TestFileStatus extends Test
       //
       Path path = new Path("/");
       System.out.println("Path : \"" + path.toString() + "\"");
-      System.out.println(fs.isDirectory(path));
-      System.out.println(fs.getFileStatus(path).isDir()); 
       assertTrue("/ should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
       
@@ -134,9 +134,24 @@ public class TestFileStatus extends Test
           fs.getWorkingDirectory()).toString(), 
           status.getPath().toString());
 
-      // create an empty directory
-      //
+      // test file status on a directory
       Path dir = new Path("/test/mkdirs");
+
+      // test listStatus on a non-existent file/directory
+      try {
+        stats = fs.listStatus(dir);
+        fail("listStatus of non-existent path should fail");
+      } catch (FileNotFoundException fe) {
+        assertTrue(fe.getMessage().equals("File " + dir + " does not exist."));
+      }
+      try {
+        status = fs.getFileStatus(dir);
+        fail("getFileStatus of non-existent path should fail");
+      } catch (FileNotFoundException fe) {
+        assertTrue(fe.getMessage().startsWith("File does not exist"));
+      }
+
+      // create the directory
       assertTrue(fs.mkdirs(dir));
       assertTrue(fs.exists(dir));
       System.out.println("Dir : \"" + dir + "\"");
@@ -170,9 +185,8 @@ public class TestFileStatus extends Test
       status = fs.getFileStatus(file2);
       assertTrue(status.getBlockSize() == blockSize);
       assertTrue(status.getReplication() == 1);
-      assertEquals(file2.makeQualified(
-          fs.getUri(), fs.getWorkingDirectory()).toString(), 
-          status.getPath().toString());
+      file2 = fs.makeQualified(file2);
+      assertEquals(file2.toString(), status.getPath().toString());
 
       // create another file in the same directory
       Path file3 = new Path(dir, "filestatus3.dat");
@@ -180,6 +194,7 @@ public class TestFileStatus extends Test
       System.out.println("Created file filestatus3.dat with one "
                          + " replicas.");
       checkFile(fs, file3, 1);
+      file3 = fs.makeQualified(file3);
 
       // verify that the size of the directory increased by the size 
       // of the two files
@@ -192,15 +207,34 @@ public class TestFileStatus extends Test
       // test listStatus on a non-empty directory
       stats = fs.listStatus(dir);
       assertEquals(dir + " should have two entries", 2, stats.length);
-      String qualifiedFile2 = file2.makeQualified(fs.getUri(), 
-          fs.getWorkingDirectory()).toString();
-      String qualifiedFile3 = file3.makeQualified(fs.getUri(), 
-          fs.getWorkingDirectory()).toString();
-      for(FileStatus stat:stats) {
-        String statusFullName = stat.getPath().toString();
-        assertTrue(qualifiedFile2.equals(statusFullName)
-          || qualifiedFile3.toString().equals(statusFullName));
-      }
+       assertEquals(file2.toString(), stats[0].getPath().toString());
+       assertEquals(file3.toString(), stats[1].getPath().toString());
+
+      // test iterative listing
+      // now dir has 2 entries, create one more
+      Path dir3 = fs.makeQualified(new Path(dir, "dir3"));
+      fs.mkdirs(dir3);
+      dir3 = fs.makeQualified(dir3);
+      stats = fs.listStatus(dir);
+      assertEquals(dir + " should have three entries", 3, stats.length);
+      assertEquals(dir3.toString(), stats[0].getPath().toString());
+      assertEquals(file2.toString(), stats[1].getPath().toString());
+      assertEquals(file3.toString(), stats[2].getPath().toString());
+
+      // now dir has 3 entries, create two more
+      Path dir4 = fs.makeQualified(new Path(dir, "dir4"));
+      fs.mkdirs(dir4);
+      dir4 = fs.makeQualified(dir4);
+      Path dir5 = fs.makeQualified(new Path(dir, "dir5"));
+      fs.mkdirs(dir5);
+      dir5 = fs.makeQualified(dir5);
+      stats = fs.listStatus(dir);
+      assertEquals(dir + " should have five entries", 5, stats.length);
+      assertEquals(dir3.toString(), stats[0].getPath().toString());
+      assertEquals(dir4.toString(), stats[1].getPath().toString());
+      assertEquals(dir5.toString(), stats[2].getPath().toString());
+      assertEquals(file2.toString(), stats[3].getPath().toString());
+      assertEquals(file3.toString(), stats[4].getPath().toString());
     } finally {
       fs.close();
       cluster.shutdown();

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java?rev=925420&r1=925419&r2=925420&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNNMetricFilesInGetListingOps.java Fri Mar 19 21:08:44 2010
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -74,9 +75,9 @@ public class TestNNMetricFilesInGetListi
     createFile("/tmp1/t2", 3200, (short)3);
     createFile("/tmp2/t1", 3200, (short)3);
     createFile("/tmp2/t2", 3200, (short)3);
-    cluster.getNameNode().getListing("/tmp1") ;
+    cluster.getNameNode().getListing("/tmp1", HdfsFileStatus.EMPTY_NAME);
     assertEquals(2,nnMetrics.numFilesInGetListingOps.getCurrentIntervalValue());
-    cluster.getNameNode().getListing("/tmp2") ;
+    cluster.getNameNode().getListing("/tmp2", HdfsFileStatus.EMPTY_NAME) ;
     assertEquals(4,nnMetrics.numFilesInGetListingOps.getCurrentIntervalValue());
   }
 }