You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/09/20 01:40:54 UTC

svn commit: r697284 [1/2] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/fs/shell/ src/docs/src/documentation/content/xdocs/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/...

Author: rangadi
Date: Fri Sep 19 16:40:54 2008
New Revision: 697284

URL: http://svn.apache.org/viewvc?rev=697284&view=rev
Log:
HADOOP-3938. Disk space quotas for HDFS. This is similar to namespace
quotas in 0.18. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/ContentSummary.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/shell/Count.java
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hdfs_quota_admin_guide.xml
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestQuota.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 16:40:54 2008
@@ -75,6 +75,9 @@
     DFS Used%: DFS used space/Present Capacity
     (Suresh Srinivas via hairong)
 
+    HADOOP-3938. Disk space quotas for HDFS. This is similar to namespace
+    quotas in 0.18. (rangadi)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/ContentSummary.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/ContentSummary.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/ContentSummary.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/ContentSummary.java Fri Sep 19 16:40:54 2008
@@ -29,25 +29,28 @@
   private long fileCount;
   private long directoryCount;
   private long quota;
+  private long spaceConsumed;
+  private long spaceQuota;
+  
 
   /** Constructor */
   public ContentSummary() {}
   
   /** Constructor */
   public ContentSummary(long length, long fileCount, long directoryCount) {
-    this.length = length;
-    this.fileCount = fileCount;
-    this.directoryCount = directoryCount;
-    this.quota = -1L;
+    this(length, fileCount, directoryCount, -1L, length, -1L);
   }
 
   /** Constructor */
   public ContentSummary(
-      long length, long fileCount, long directoryCount, long quota) {
+      long length, long fileCount, long directoryCount, long quota,
+      long spaceConsumed, long spaceQuota) {
     this.length = length;
     this.fileCount = fileCount;
     this.directoryCount = directoryCount;
     this.quota = quota;
+    this.spaceConsumed = spaceConsumed;
+    this.spaceQuota = spaceQuota;
   }
 
   /** @return the length */
@@ -62,12 +65,20 @@
   /** Return the directory quota */
   public long getQuota() {return quota;}
   
+  /** Retuns (disk) space consumed */ 
+  public long getSpaceConsumed() {return spaceConsumed;}
+
+  /** Returns (disk) space quota */
+  public long getSpaceQuota() {return spaceQuota;}
+  
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     out.writeLong(length);
     out.writeLong(fileCount);
     out.writeLong(directoryCount);
     out.writeLong(quota);
+    out.writeLong(spaceConsumed);
+    out.writeLong(spaceQuota);
   }
 
   /** {@inheritDoc} */
@@ -76,6 +87,8 @@
     this.fileCount = in.readLong();
     this.directoryCount = in.readLong();
     this.quota = in.readLong();
+    this.spaceConsumed = in.readLong();
+    this.spaceQuota = in.readLong();
   }
   
   /** 
@@ -86,20 +99,20 @@
   private static final String STRING_FORMAT = "%12d %12d %18d ";
   /** 
    * Output format:
-   * <----12----> <----15----> <----12----> <----12----> <-------18------->
-   *    QUOTA   REMAINING_QUATA  DIR_COUNT   FILE_COUNT        CONTENT_SIZE FILE_NAME    
+   * <----12----> <----15----> <----15----> <----15----> <----12----> <----12----> <-------18------->
+   *    QUOTA   REMAINING_QUATA SPACE_QUOTA SPACE_QUOTA_REM DIR_COUNT   FILE_COUNT   CONTENT_SIZE     FILE_NAME    
    */
-  private static final String QUOTA_STRING_FORMAT = "%12d %15d "+STRING_FORMAT;
-  private static final String NON_QUOTA_STRING_FORMAT =
-    "%12s %15s "+STRING_FORMAT;
-
+  private static final String QUOTA_STRING_FORMAT = "%12s %15s ";
+  private static final String SPACE_QUOTA_STRING_FORMAT = "%15s %15s ";
+  
   /** The header string */
   private static final String HEADER = String.format(
       STRING_FORMAT.replace('d', 's'), "directories", "files", "bytes");
 
   private static final String QUOTA_HEADER = String.format(
-      QUOTA_STRING_FORMAT.replace('d', 's'), 
-      "quota", "remaining quota", "directories", "files", "bytes");
+      QUOTA_STRING_FORMAT + SPACE_QUOTA_STRING_FORMAT, 
+      "quota", "remaining quota", "space quota", "reamaining quota") +
+      HEADER;
   
   /** Return the header of the output.
    * if qOption is false, output directory count, file count, and content size;
@@ -125,17 +138,27 @@
    * @return the string representation of the object
    */
   public String toString(boolean qOption) {
+    String prefix = "";
     if (qOption) {
+      String quotaStr = "none";
+      String quotaRem = "inf";
+      String spaceQuotaStr = "none";
+      String spaceQuotaRem = "inf";
+      
       if (quota>0) {
-        long remainingQuota = quota-(directoryCount+fileCount);
-        return String.format(QUOTA_STRING_FORMAT, quota, remainingQuota,
-            directoryCount, fileCount, length);
-      } else {
-        return String.format(NON_QUOTA_STRING_FORMAT, "none", "inf",
-            directoryCount, fileCount, length);
+        quotaStr = Long.toString(quota);
+        quotaRem = Long.toString(quota-(directoryCount+fileCount));
+      }
+      if (spaceQuota>0) {
+        spaceQuotaStr = Long.toString(spaceQuota);
+        spaceQuotaRem = Long.toString(spaceQuota - spaceConsumed);        
       }
-    } else {
-      return String.format(STRING_FORMAT, directoryCount, fileCount, length);
+      
+      prefix = String.format(QUOTA_STRING_FORMAT + SPACE_QUOTA_STRING_FORMAT, 
+                             quotaStr, quotaRem, spaceQuotaStr, spaceQuotaRem);
     }
+    
+    return prefix + String.format(STRING_FORMAT, directoryCount, 
+                                  fileCount, length);
   }
 }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/shell/Count.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/shell/Count.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/shell/Count.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/shell/Count.java Fri Sep 19 16:40:54 2008
@@ -34,7 +34,8 @@
       "Count the number of directories, files and bytes under the paths",
       "that match the specified file pattern.  The output columns are:",
       "DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME or",
-      "QUOTA REMAINING_QUATA DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME");
+      "QUOTA REMAINING_QUATA SPACE_QUOTA REMAINING_SPACE_QUOTA ",
+      "      DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME");
   
   private boolean qOption;
 

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hdfs_quota_admin_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hdfs_quota_admin_guide.xml?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hdfs_quota_admin_guide.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/hdfs_quota_admin_guide.xml Fri Sep 19 16:40:54 2008
@@ -1,84 +1,105 @@
-<?xml version="1.0"?>
-<!--
-  Copyright 2002-2004 The Apache Software Foundation
-
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+<?xml version="1.0"?> 
 
-<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
-          "http://forrest.apache.org/dtd/document-v20.dtd">
+<!-- Copyright 2002-2004 The Apache Software Foundation
 
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+governing permissions and limitations under the License. -->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
 
 <document>
 
-  <header>
-    <title>
-      Name Space Quotas Administrator Guide
-    </title>
-  </header>
-
-  <body>
-      <p>
-      The Hadoop Distributed File System (HDFS) allows the administrator to set quotas on individual directories. 
-      Newly created directories have no associated quota. 
-      The largest quota is <code>Long.Max_Value</code>. A quota of one forces a directory 
-      to remain empty.
-      </p>
-
-      <p>
-      The directory quota is a hard limit on the number of names in the tree 
-      rooted at that directory. File and directory creations fault if the quota 
-      would be exceeded. Quotas stick to renamed directories; the rename 
-      operation faults if operation would result in a quota violation. 
-      The attempt to set a quota faults if the directory would be in violation 
-      of the new quota.
-      </p>
-
-      <p>
-      Quotas are persistent with the fsimage. When starting, if the fsimage 
-      is immediately in violation of a quota (perhaps the fsimage was 
-      surreptitiously modified), the startup operation fails with an error report. 
-      Setting or removing a quota creates a journal entry.
-      </p> 
-
-      <p>
-      The following new commands or new options are added to support quotas. 
-      The first two are administration commands.
-      </p>
-
-      <ul>
-      <li>
-      <code>dfsadmin -setquota &lt;N> &lt;directory>...&lt;directory></code> 
-      <br />
-      Set the quota to be <code>N</code> for each directory. Best effort for each directory,
-      with faults reported if <code>N</code> is not a positive long integer, 
-      the directory does not exist or it is a file, or the directory would 
-      immediately exceed the new quota.
-      </li>
-  
-      <li>
-      <code>dfsadmin -clrquota &lt;directory>...&lt;director></code><br /> 
-      Remove any quota for each directory. Best effort for each directory, 
-      with faults reported if the directory does not exist or it is a file. 
-      It is not a fault if the directory has no quota.
-      </li>
-  
-      <li>
-      <code>fs -count -q &lt;directory>...&lt;directory></code><br />
-      With the <code>-q</code> option, also report the quota value set for each 
-      directory, and the available quota remaining. If the directory does not have 
-      a quota set, the reported values are <code>none</code> and <code>inf</code>.
-      </li>
-      </ul>
-   </body>
+ <header> <title> Directory Quotas Administrator's Guide </title> </header>
+
+ <body>
+
+ <p> The Hadoop Distributed File System (HDFS) allows the administrator to set quotas for the number of names used and the
+amount of space used for individual directories. Name quotas and space quotas operate independently, but the administration and
+implementation of the two types of quotas are closely parallel. </p>
+
+<section> <title>Name Quotas</title>
+
+ <p> The name quota is a hard limit on the number of file and directory names in the tree rooted at that directory. File and
+directory creations fail if the quota would be exceeded. Quotas stick with renamed directories; the rename operation fails if
+operation would result in a quota violation. The attempt to set a quota fails if the directory would be in violation of the new
+quota. A newly created directory has no associated quota. The largest quota is <code>Long.Max_Value</code>. A quota of one
+forces a directory to remain empty. (Yes, a directory counts against its own quota!) </p>
+
+ <p> Quotas are persistent with the <code>fsimage</code>. When starting, if the <code>fsimage</code> is immediately in
+violation of a quota (perhaps the <code>fsimage</code> was surreptitiously modified), 
+a warning is printed for each of such violations. Setting or removing a quota creates a journal entry. </p> </section>
+
+<section> <title>Space Quotas</title>
+
+ <p> The space quota is a hard limit on the number of bytes used by files in the tree rooted at that directory. Block
+allocations fail if the quota would not allow a full block to be written. Each replica of a block counts against the quota. Quotas
+stick with renamed directories; the rename operation fails if the operation would result in a quota violation. The attempt to
+set a quota fails if the directory would be in violation of the new quota. A newly created directory has no associated quota.
+The largest quota is <code>Long.Max_Value</code>. A quota of zero still permits files to be created, but no blocks can be added to the files.
+Directories don't use host file system space and don't count against the space quota. The host file system space used to save
+the file meta data is not counted against the quota. Quotas are charged at the intended replication factor for the file;
+changing the replication factor for a file will credit or debit quotas. </p>
+
+ <p> Quotas are persistent with the <code>fsimage</code>. When starting, if the <code>fsimage</code> is immediately in
+violation of a quota (perhaps the <code>fsimage</code> was surreptitiously modified), a warning is printed for
+each of such violations. Setting or removing a quota creates a journal entry. </p>
+
+ </section>
+
+<section>
+
+ <title>Administrative Commands</title>
+
+ <p> Quotas are managed by a set of commands available only to the administrator. </p>
+
+<ul>
+
+ <li> <code>dfsadmin -setquota &lt;N> &lt;directory>...&lt;directory></code> <br /> Set the name quota to be <code>N</code> for
+each directory. Best effort for each directory, with faults reported if <code>N</code> is not a positive long integer, the
+directory does not exist or it is a file, or the directory would immediately exceed the new quota. </li>
+
+ <li> <code>dfsadmin -clrquota &lt;directory>...&lt;director></code><br /> Remove any name quota for each directory. Best
+effort for each directory, with faults reported if the directory does not exist or it is a file. It is not a fault if the
+directory has no quota. </li>
+
+ <li> <code>dfsadmin -setspacequota &lt;N> &lt;directory>...&lt;directory></code> <br /> Set the space quota to be
+N&times;2<sup>30</sup> bytes (GB) for each directory. Best effort for each directory, with faults reported if <code>N</code> is
+neither zero nor a positive integer, the directory does not exist or it is a file, or the directory would immediately exceed
+the new quota. </li>
+
+ <li> <code>dfsadmin -clrspacequota &lt;directory>...&lt;director></code><br /> Remove any space quota for each directory. Best
+effort for each directory, with faults reported if the directory does not exist or it is a file. It is not a fault if the
+directory has no quota. </li>
+
+ </ul>
+
+</section>
+
+<section>
+
+ <title>Reporting Command</title>
+
+ <p> An an extension to the <code>count</code> command of the HDFS shell reports quota values and the current count of names and bytes in use. </p> 
+
+<ul>
+	
+	<li>
+
+ <code>fs -count -q &lt;directory>...&lt;directory></code><br /> With the <code>-q</code> option, also report the name quota
+value set for each directory, the available name quota remaining, the space quota value set, and the available space quota
+remaining. If the directory does not have a quota set, the reported values are <code>none</code> and <code>inf</code>. Space
+values are rounded to multiples of 2<sup>30</sup> bytes (GB).
+
+ </li>
+
+ </ul> </section>
+
+</body>
+
 </document>

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 19 16:40:54 2008
@@ -500,7 +500,8 @@
     try {
       return namenode.setReplication(src, replication);
     } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class);
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     QuotaExceededException.class);
     }
   }
 
@@ -840,32 +841,24 @@
   }
 
   /**
-   * Remove the quota for a directory
-   * @param path The string representation of the path to the directory
-   * @throws FileNotFoundException if the path is not a directory
+   * Sets or resets quotas for a directory.
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)
    */
-  void clearQuota(String src) throws IOException {
-    try {
-      namenode.clearQuota(src);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class,
-                                     FileNotFoundException.class);
+  void setQuota(String src, long namespaceQuota, long diskspaceQuota) 
+                                                 throws IOException {
+    // sanity check
+    if ((namespaceQuota <= 0 && namespaceQuota != FSConstants.QUOTA_DONT_SET &&
+         namespaceQuota != FSConstants.QUOTA_RESET) ||
+        (diskspaceQuota <= 0 && diskspaceQuota != FSConstants.QUOTA_DONT_SET &&
+         diskspaceQuota != FSConstants.QUOTA_RESET)) {
+      throw new IllegalArgumentException("Invalid values for quota : " +
+                                         namespaceQuota + " and " + 
+                                         diskspaceQuota);
+                                         
     }
-  }
-  
-  /**
-   * Set the quota for a directory.
-   * @param path  The string representation of the path to the directory
-   * @param quota The limit of the number of names in the tree rooted 
-   *              at the directory
-   * @throws FileNotFoundException if the path is a file or 
-   *                               does not exist 
-   * @throws QuotaExceededException if the directory size 
-   *                                is greater than the given quota
-   */
-  void setQuota(String src, long quota) throws IOException {
+    
     try {
-      namenode.setQuota(src, quota);
+      namenode.setQuota(src, namespaceQuota, diskspaceQuota);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
@@ -2008,7 +2001,7 @@
     private DatanodeInfo[] nodes = null; // list of targets for current block
     private volatile boolean hasError = false;
     private volatile int errorIndex = 0;
-    private IOException lastException = null;
+    private volatile IOException lastException = null;
     private long artificialSlowdown = 0;
     private long lastFlushOffset = -1; // offset when flush was invoked
     private boolean persistBlocks = false; // persist blocks on namenode
@@ -2016,6 +2009,12 @@
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
     private volatile boolean appendChunk = false;   // appending to existing partial block
 
+    private void setLastException(IOException e) {
+      if (lastException == null) {
+        lastException = e;
+      }
+    }
+    
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
       byte[]  buf;
@@ -2206,6 +2205,9 @@
             } catch (Throwable e) {
               LOG.warn("DataStreamer Exception: " + 
                        StringUtils.stringifyException(e));
+              if (e instanceof IOException) {
+                setLastException((IOException)e);
+              }
               hasError = true;
             }
           }
@@ -2334,6 +2336,9 @@
           } catch (Exception e) {
             if (!closed) {
               hasError = true;
+              if (e instanceof IOException) {
+                setLastException((IOException)e);
+              }
               LOG.warn("DFSOutputStream ResponseProcessor exception " + 
                        " for block " + block +
                         StringUtils.stringifyException(e));
@@ -2395,8 +2400,9 @@
       while (!success && clientRunning) {
         DatanodeInfo[] newnodes = null;
         if (nodes == null) {
-          lastException = new IOException("Could not get block locations. " +
-                                          "Aborting...");
+          String msg = "Could not get block locations. Aborting...";
+          LOG.warn(msg);
+          setLastException(new IOException(msg));
           closed = true;
           if (streamer != null) streamer.close();
           return false;
@@ -2473,6 +2479,7 @@
         }
 
         this.hasError = false;
+        lastException = null;
         errorIndex = 0;
         success = createBlockOutputStream(nodes, clientName, true);
       }
@@ -2646,6 +2653,7 @@
       boolean success;
       do {
         hasError = false;
+        lastException = null;
         errorIndex = 0;
         retry = false;
         nodes = null;
@@ -2756,6 +2764,7 @@
           }
         }
         hasError = true;
+        setLastException(ie);
         blockReplyStream = null;
         return false;  // error
       }
@@ -2771,6 +2780,14 @@
           try {
             return namenode.addBlock(src, clientName);
           } catch (RemoteException e) {
+            IOException ue = 
+              e.unwrapRemoteException(FileNotFoundException.class,
+                                      AccessControlException.class,
+                                      QuotaExceededException.class);
+            if (ue != e) { 
+              throw ue; // no need to retry these exceptions
+            }
+            
             if (--retries == 0 && 
                 !NotReplicatedYetException.class.getName().
                 equals(e.getClassName())) {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Sep 19 16:40:54 2008
@@ -206,22 +206,12 @@
     return dfs.getContentSummary(getPathName(f));
   }
 
-  /** Clear a directory's quota
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#clearQuota(String)
+  /** Set a directory's quotas
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long) 
    */
-  public void clearQuota(Path src) throws IOException {
-    dfs.clearQuota(getPathName(src));
-  }
-  
-  /** Set a directory's quota
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long) 
-   */
-  public void setQuota(Path src, long quota) throws IOException {
-    if (quota <= 0) {
-      throw new IllegalArgumentException("Quota should be a positive number: "
-          + quota);
-    }
-    dfs.setQuota(getPathName(src), quota);
+  public void setQuota(Path src, long namespaceQuota, long diskspaceQuota) 
+                       throws IOException {
+    dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
   }
   
   private FileStatus makeQualified(FileStatus f) {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Sep 19 16:40:54 2008
@@ -39,9 +39,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 39: removed DFSFileInfo (Use FileStatus instead)
+   * 40: added disk space quotas.
    */
-  public static final long versionID = 39L;
+  public static final long versionID = 40L;
   
   ///////////////////////////////////////
   // File contents
@@ -409,21 +409,24 @@
   /**
    * Set the quota for a directory.
    * @param path  The string representation of the path to the directory
-   * @param quota The limit of the number of names in the tree rooted 
-   *              at the directory
+   * @param namespaceQuota Limit on the number of names in the tree rooted 
+   *                       at the directory
+   * @param diskspaceQuota Limit on disk space occupied all the files under
+   *                       this directory. 
+   * <br><br>
+   *                       
+   * The quota can have three types of values : (1) 0 or more will set 
+   * the quota to that value, (2) {@link FSConstants#QUOTA_DONT_SET}  implies 
+   * the quota will not be changed, and (3) {@link FSConstants#QUOTA_RESET} 
+   * implies the quota will be reset. Any other value is a runtime error.
+   *                        
    * @throws FileNotFoundException if the path is a file or 
    *                               does not exist 
    * @throws QuotaExceededException if the directory size 
    *                                is greater than the given quota
    */
-  public void setQuota(String path, long quota) throws IOException;
-  
-  /**
-   * Remove the quota for a directory
-   * @param path The string representation of the path to the directory
-   * @throws FileNotFoundException if the path is not a directory
-   */
-  public void clearQuota(String path) throws IOException;
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
+                      throws IOException;
   
   /**
    * Write all metadata for this file into persistent storage.

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Sep 19 16:40:54 2008
@@ -116,6 +116,10 @@
   // Chunk the block Invalidate message
   public static final int BLOCK_INVALIDATE_CHUNK = 100;
 
+  // Long that indicates "leave current quota unchanged"
+  public static final long QUOTA_DONT_SET = Long.MAX_VALUE;
+  public static final long QUOTA_RESET = -1L;
+  
   //
   // Timeouts, constants
   //
@@ -190,7 +194,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -17;
+  public static final int LAYOUT_VERSION = -18;
   // Current version: 
-  // Support Access time on files
+  // Support disk space quotas
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java Fri Sep 19 16:40:54 2008
@@ -20,23 +20,33 @@
 
 import java.io.IOException;
 
-/** This class is for the error when an attempt to add an inode to namespace 
- * violates the quota restriction of any inode on the path to the newly added
- * inode.
+/** 
+ * This exception is thrown when modification to HDFS results in violation
+ * of a directory quota. A directory quota might be namespace quota (limit 
+ * on number of files and directories) or a diskspace quota (limit on space 
+ * taken by all the file under the directory tree). <br> <br>
+ * 
+ * The message for the exception specifies the the directory where the quota
+ * was violated and actual quotas.
  */
 public final class QuotaExceededException extends IOException {
   private static final long serialVersionUID = 1L;
   private String pathName;
-  private long quota;
-  private long count;
+  private long nsQuota;
+  private long nsCount;
+  private long dsQuota;
+  private long diskspace;
   
   public QuotaExceededException(String msg) {
     super(msg);
   }
   
-  public QuotaExceededException(long quota, long count) {
-    this.quota = quota;
-    this.count = count;
+  public QuotaExceededException(long nsQuota, long nsCount,
+                                long dsQuota, long diskspace) {
+    this.nsQuota = nsQuota;
+    this.nsCount = nsCount;
+    this.dsQuota = dsQuota;
+    this.diskspace = diskspace;
   }
   
   public void setPathName(String path) {
@@ -47,7 +57,9 @@
     String msg = super.getMessage();
     if (msg == null) {
       return "The quota" + (pathName==null?"":(" of " + pathName)) + 
-          " is exceeded: quota=" + quota + " count=" + count;
+          " is exceeded: namespace quota=" + nsQuota + " file count=" + 
+          nsCount + ", diskspace quota=" + dsQuota + 
+          " diskspace=" + diskspace; 
     } else {
       return msg;
     }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Sep 19 16:40:54 2008
@@ -61,7 +61,7 @@
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
-        Integer.MAX_VALUE);
+        Integer.MAX_VALUE, -1);
     this.fsImage = fsImage;
     namesystem = ns;
     initialize(conf);
@@ -154,7 +154,7 @@
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
     synchronized (rootDir) {
-      newNode = addNode(path, newNode, false);
+      newNode = addNode(path, newNode, -1, false);
     }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
@@ -180,14 +180,17 @@
                             long atime,
                             long preferredBlockSize) {
     INode newNode;
+    long diskspace = -1; // unknown
     if (blocks == null)
       newNode = new INodeDirectory(permissions, modificationTime);
-    else 
+    else {
       newNode = new INodeFile(permissions, blocks.length, replication,
                               modificationTime, atime, preferredBlockSize);
+      diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
+    }
     synchronized (rootDir) {
       try {
-        newNode = addNode(path, newNode, false);
+        newNode = addNode(path, newNode, diskspace, false);
         if(newNode != null && blocks != null) {
           int nrBlocks = blocks.length;
           // Add file->block mapping
@@ -210,14 +213,16 @@
                               short replication,
                               long modificationTime,
                               long atime,
-                              long quota,
+                              long nsQuota,
+                              long dsQuota,
                               long preferredBlockSize) {
+    // NOTE: This does not update space counts for parents
     // create new inode
     INode newNode;
     if (blocks == null) {
-      if (quota >= 0) {
+      if (nsQuota >= 0 || dsQuota >= 0) {
         newNode = new INodeDirectoryWithQuota(
-            permissions, modificationTime, quota);
+            permissions, modificationTime, nsQuota, dsQuota);
       } else {
         newNode = new INodeDirectory(permissions, modificationTime);
       }
@@ -249,12 +254,16 @@
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  Block addBlock(String path, INode file, Block block) throws IOException {
+  Block addBlock(String path, INode[] inodes, Block block) throws IOException {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) file;
+      INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
 
+      // check quota limits and updated space consumed
+      updateCount(inodes, inodes.length-1, 0, 
+                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
+      
       // associate the new list of blocks with this file
       namesystem.blocksMap.addINode(block, fileNode);
       BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
@@ -347,10 +356,8 @@
    */
   boolean unprotectedRenameTo(String src, String dst, long timestamp) 
   throws QuotaExceededException {
-    byte[][] srcComponents = INode.getPathComponents(src);
-    INode[] srcInodes = new INode[srcComponents.length];
     synchronized (rootDir) {
-      rootDir.getExistingPathINodes(srcComponents, srcInodes);
+      INode[] srcInodes = rootDir.getExistingPathINodes(src);
 
       // check the validation of the source
       if (srcInodes[srcInodes.length-1] == null) {
@@ -463,13 +470,20 @@
     oldReplication[0] = -1;
     Block[] fileBlocks = null;
     synchronized(rootDir) {
-      INode inode = rootDir.getNode(src);
+      INode[] inodes = rootDir.getExistingPathINodes(src);
+      INode inode = inodes[inodes.length - 1];
       if (inode == null)
         return null;
       if (inode.isDirectory())
         return null;
       INodeFile fileNode = (INodeFile)inode;
       oldReplication[0] = fileNode.getReplication();
+
+      // check disk quota
+      long dsDelta = (replication - oldReplication[0]) *
+           (fileNode.diskspaceConsumed()/oldReplication[0]);
+      updateCount(inodes, inodes.length-1, 0, dsDelta);
+
       fileNode.setReplication(replication);
       fileBlocks = fileNode.getBlocks();
     }
@@ -584,12 +598,9 @@
    */ 
   INode unprotectedDelete(String src, long modificationTime) {
     src = normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    INode[] inodes = new INode[components.length];
 
     synchronized (rootDir) {
-      rootDir.getExistingPathINodes(components, inodes);
+      INode[] inodes =  rootDir.getExistingPathINodes(src);
       INode targetNode = inodes[inodes.length-1];
 
       if (targetNode == null) { // non-existent src
@@ -630,8 +641,18 @@
    * Replaces the specified inode with the specified one.
    */
   void replaceNode(String path, INodeFile oldnode, INodeFile newnode) 
-                      throws IOException {
+                                                   throws IOException {
+    replaceNode(path, oldnode, newnode, true);
+  }
+  
+  /**
+   * @see #replaceNode(String, INodeFile, INodeFile)
+   */
+  private void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
+                           boolean updateDiskspace) throws IOException {    
     synchronized (rootDir) {
+      long dsOld = oldnode.diskspaceConsumed();
+      
       //
       // Remove the node from the namespace 
       //
@@ -641,7 +662,25 @@
         throw new IOException("FSDirectory.replaceNode: " +
                               "failed to remove " + path);
       } 
+      
+      /* Currently oldnode and newnode are assumed to contain the same
+       * blocks. Otherwise, blocks need to be removed from the blocksMap.
+       */
+      
       rootDir.addNode(path, newnode); 
+
+      //check if disk space needs to be updated.
+      long dsNew = 0;
+      if (updateDiskspace && (dsNew = newnode.diskspaceConsumed()) != dsOld) {
+        try {
+          updateSpaceConsumed(path, 0, dsNew-dsOld);
+        } catch (QuotaExceededException e) {
+          // undo
+          replaceNode(path, newnode, oldnode, false);
+          throw e;
+        }
+      }
+      
       int index = 0;
       for (Block b : newnode.getBlocks()) {
         BlockInfo info = namesystem.blocksMap.addINode(b, newnode);
@@ -725,6 +764,24 @@
     }
   }
 
+  /**
+   * Retrieve the existing INodes along the given path.
+   * 
+   * @param path the path to explore
+   * @return INodes array containing the existing INodes in the order they
+   *         appear when following the path from the root INode to the
+   *         deepest INodes. The array size will be the number of expected
+   *         components in the path, and non existing components will be
+   *         filled with null
+   *         
+   * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
+   */
+  INode[] getExistingPathINodes(String path) {
+    synchronized (rootDir){
+      return rootDir.getExistingPathINodes(path);
+    }
+  }
+  
   /** 
    * Check whether the filepath could be created
    */
@@ -751,39 +808,68 @@
     }
   }
 
+  /** Updates namespace and diskspace consumed for all
+   * directories until the parent directory of file represented by path.
+   * 
+   * @param path path for the file.
+   * @param nsDelta the delta change of namespace
+   * @param dsDelta the delta change of diskspace
+   * @throws QuotaExceededException if the new count violates any quota limit
+   * @throws FileNotFound if path does not exist.
+   */
+  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+                                         throws QuotaExceededException,
+                                                FileNotFoundException {
+    synchronized (rootDir) {
+      INode[] inodes = rootDir.getExistingPathINodes(path);
+      int len = inodes.length;
+      if (inodes[len - 1] == null) {
+        throw new FileNotFoundException(path + 
+                                        " does not exist under rootDir.");
+      }
+      updateCount(inodes, len-1, nsDelta, dsDelta);
+    }
+  }
+  
   /** update count of each inode with quota
    * 
    * @param inodes an array of inodes on a path
    * @param numOfINodes the number of inodes to update starting from index 0
-   * @param deltaCount the delta change of the count
+   * @param nsDelta the delta change of namespace
+   * @param dsDelta the delta change of diskspace
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private static void updateCount(
-      INode[] inodes, int numOfINodes, long deltaCount )
-  throws QuotaExceededException {
+  private void updateCount(INode[] inodes, int numOfINodes, 
+                           long nsDelta, long dsDelta)
+                           throws QuotaExceededException {
+    if (!ready) {
+      //still intializing. do not check or update quotas.
+      return;
+    }
     if (numOfINodes>inodes.length) {
       numOfINodes = inodes.length;
     }
     // check existing components in the path  
-    List<INodeDirectoryWithQuota> inodesWithQuota = 
-      new ArrayList<INodeDirectoryWithQuota>(numOfINodes);
     int i=0;
     try {
       for(; i < numOfINodes; i++) {
-        if (inodes[i].getQuota() >= 0) { // a directory with quota
-          INodeDirectoryWithQuota quotaINode =(INodeDirectoryWithQuota)inodes[i]; 
-          quotaINode.updateNumItemsInTree(deltaCount);
-          inodesWithQuota.add(quotaINode);
+        if (inodes[i].isQuotaSet()) { // a directory with quota
+          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+          node.updateNumItemsInTree(nsDelta, dsDelta);
         }
       }
     } catch (QuotaExceededException e) {
-      for (INodeDirectoryWithQuota quotaINode:inodesWithQuota) {
+      e.setPathName(getFullPathName(inodes, i));
+      // undo updates
+      for( ; i-- > 0; ) {
         try {
-          quotaINode.updateNumItemsInTree(-deltaCount);
+          if (inodes[i].isQuotaSet()) { // a directory with quota
+            INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+            node.updateNumItemsInTree(-nsDelta, -dsDelta);
+          }
         } catch (IOException ingored) {
         }
       }
-      e.setPathName(getFullPathName(inodes, i));
       throw e;
     }
   }
@@ -881,17 +967,19 @@
         inheritPermission );
   }
   
-  /** Add a node child to the namespace. The full path name of the node is src. 
+  /** Add a node child to the namespace. The full path name of the node is src.
+   * childDiskspace should be -1, if unknown. 
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addNode(String src, T child, 
-      boolean inheritPermission) 
+        long childDiskspace, boolean inheritPermission) 
   throws QuotaExceededException {
     byte[][] components = INode.getPathComponents(src);
     child.setLocalName(components[components.length-1]);
     INode[] inodes = new INode[components.length];
     synchronized (rootDir) {
       rootDir.getExistingPathINodes(components, inodes);
-      return addChild(inodes, inodes.length-1, child, inheritPermission);
+      return addChild(inodes, inodes.length-1, child, childDiskspace,
+                      inheritPermission);
     }
   }
   
@@ -900,12 +988,25 @@
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
       boolean inheritPermission) throws QuotaExceededException {
-    long childSize = child.numItemsInTree();
-    updateCount(pathComponents, pos, childSize);
+    return addChild(pathComponents, pos, child, -1, inheritPermission);
+  }
+  
+  /** Add a node child to the inodes at index pos. 
+   * Its ancestors are stored at [0, pos-1]. 
+   * QuotaExceededException is thrown if it violates quota limit */
+  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
+       long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
+    INode.DirCounts counts = new INode.DirCounts();
+    child.spaceConsumedInTree(counts);
+    if (childDiskspace < 0) {
+      childDiskspace = counts.getDsCount();
+    }
+    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
         child, inheritPermission);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, -childSize);
+      updateCount(pathComponents, pos, 
+                  -counts.getNsCount(), -childDiskspace);
     }
     return addedNode;
   }
@@ -920,8 +1021,10 @@
     INode removedNode = 
       ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
     if (removedNode != null) {
-      updateCount(pathComponents, pos, 
-          -removedNode.numItemsInTree());
+      INode.DirCounts counts = new INode.DirCounts();
+      removedNode.spaceConsumedInTree(counts);
+      updateCount(pathComponents, pos,
+                  -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;
   }
@@ -952,117 +1055,149 @@
    * A directory's count is defined as the total number inodes in the tree
    * rooted at the directory.
    * 
-   * @throws QuotaExceededException if the count update violates 
-   *                                any quota limitation
+   * This is an update of existing state of the filesystem and does not
+   * throw QuotaExceededException.
    */
-  void updateCountForINodeWithQuota() throws QuotaExceededException {
-    updateCountForINodeWithQuota(rootDir);
+  void updateCountForINodeWithQuota() {
+    updateCountForINodeWithQuota(rootDir, new INode.DirCounts(), 
+                                 new ArrayList<INode>(50));
   }
   
-  /** Update the count of the directory if it has a quota and return the count
+  /** 
+   * Update the count of the directory if it has a quota and return the count
+   * 
+   * This does not throw a QuotaExceededException. This is just an update
+   * of of existing state and throwing QuotaExceededException does not help
+   * with fixing the state, if there is a problem.
    * 
-   * @param node the root of the tree that represents the directory
+   * @param dir the root of the tree that represents the directory
+   * @param counters counters for name space and disk space
+   * @param nodesInPath INodes for the each of components in the path.
    * @return the size of the tree
-   * @throws QuotaExceededException if the count is greater than its quota
    */
-  private static long updateCountForINodeWithQuota(INode node) throws QuotaExceededException {
-    long count = 1L;
-    if (node.isDirectory()) {
-      INodeDirectory dNode = (INodeDirectory)node;
-      for (INode child : dNode.getChildren()) {
-        count += updateCountForINodeWithQuota(child);
-      }
-      if (dNode.getQuota()>=0) {
-        ((INodeDirectoryWithQuota)dNode).setCount(count);
+  private static void updateCountForINodeWithQuota(INodeDirectory dir, 
+                                               INode.DirCounts counts,
+                                               ArrayList<INode> nodesInPath) {
+    long parentNamespace = counts.nsCount;
+    long parentDiskspace = counts.dsCount;
+    
+    counts.nsCount = 1L;//for self. should not call node.spaceConsumedInTree()
+    counts.dsCount = 0L;
+    
+    /* We don't need nodesInPath if we could use 'parent' field in 
+     * INode. using 'parent' is not currently recommended. */
+    nodesInPath.add(dir);
+
+    for (INode child : dir.getChildren()) {
+      if (child.isDirectory()) {
+        updateCountForINodeWithQuota((INodeDirectory)child, 
+                                     counts, nodesInPath);
+      } else { // reduce recursive calls
+        counts.nsCount += 1;
+        counts.dsCount += ((INodeFile)child).diskspaceConsumed();
       }
     }
-    return count;
+      
+    if (dir.isQuotaSet()) {
+      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(counts.nsCount,
+                                                      counts.dsCount);
+
+      // check if quota is violated for some reason.
+      if ((dir.getNsQuota() >= 0 && counts.nsCount > dir.getNsQuota()) ||
+          (dir.getDsQuota() >= 0 && counts.dsCount > dir.getDsQuota())) {
+
+        // can only happen because of a software bug. the bug should be fixed.
+        StringBuilder path = new StringBuilder(512);
+        for (INode n : nodesInPath) {
+          path.append('/');
+          path.append(n.getLocalName());
+        }
+        
+        NameNode.LOG.warn("Unexpected quota violation in image for " + path + 
+                          " (Namespace quota : " + dir.getNsQuota() +
+                          " consumed : " + counts.nsCount + ")" +
+                          " (Diskspace quota : " + dir.getDsQuota() +
+                          " consumed : " + counts.dsCount + ").");
+      }            
+    }
+      
+    // pop 
+    nodesInPath.remove(nodesInPath.size()-1);
+    
+    counts.nsCount += parentNamespace;
+    counts.dsCount += parentDiskspace;
   }
   
   /**
-   * Set the quota for a directory.
-   * @param path The string representation of the path to the directory
-   * @param quota The limit of the number of names in or below the directory
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
+   * Sets quota for for a directory.
+   * @returns INodeDirectory if any of the quotas have changed. null other wise.
    * @throws FileNotFoundException if the path does not exist or is a file
    * @throws QuotaExceededException if the directory tree size is 
    *                                greater than the given quota
    */
-  void unprotectedSetQuota(String src, long quota)
-  throws FileNotFoundException, QuotaExceededException {
+  INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota) 
+                       throws FileNotFoundException, QuotaExceededException {
+    // sanity check
+    if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET && 
+         nsQuota < FSConstants.QUOTA_RESET) || 
+        (dsQuota < 0 && dsQuota != FSConstants.QUOTA_DONT_SET && 
+          dsQuota < FSConstants.QUOTA_RESET)) {
+      throw new IllegalArgumentException("Illegal value for nsQuota or " +
+                                         "dsQuota : " + nsQuota + " and " +
+                                         dsQuota);
+    }
+    
     String srcs = normalizePath(src);
-    byte[][] components = INode.getPathComponents(src);
-    INode[] inodes = new INode[components.length==1?1:2];
-    synchronized (rootDir) {
-      rootDir.getExistingPathINodes(components, inodes);
-      INode targetNode = inodes[inodes.length-1];
-      if (targetNode == null) {
-        throw new FileNotFoundException("Directory does not exist: " + srcs);
-      } else if (!targetNode.isDirectory()) {
-        throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
-      } else { // a directory inode
-        INodeDirectory dirNode = (INodeDirectory)targetNode;
-        if (dirNode instanceof INodeDirectoryWithQuota) { 
-          // a directory with quota; so set the quota to the new value
-          ((INodeDirectoryWithQuota)dirNode).setQuota(quota);
-        } else {
-          // a non-quota directory; so replace it with a directory with quota
-          INodeDirectoryWithQuota newNode = 
-            new INodeDirectoryWithQuota(quota, dirNode);
-          // non-root directory node; parent != null
-          assert inodes.length==2;
-          INodeDirectory parent = (INodeDirectory)inodes[0];
-          parent.replaceChild(newNode);
-        }
+    INode[] inodes = rootDir.getExistingPathINodes(src);
+    INode targetNode = inodes[inodes.length-1];
+    if (targetNode == null) {
+      throw new FileNotFoundException("Directory does not exist: " + srcs);
+    } else if (!targetNode.isDirectory()) {
+      throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
+    } else { // a directory inode
+      INodeDirectory dirNode = (INodeDirectory)targetNode;
+      long oldNsQuota = dirNode.getNsQuota();
+      long oldDsQuota = dirNode.getDsQuota();
+      if (nsQuota == FSConstants.QUOTA_DONT_SET) {
+        nsQuota = oldNsQuota;
+      }
+      if (dsQuota == FSConstants.QUOTA_DONT_SET) {
+        dsQuota = oldDsQuota;
+      }        
+
+      if (dirNode instanceof INodeDirectoryWithQuota) { 
+        // a directory with quota; so set the quota to the new value
+        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
+      } else {
+        // a non-quota directory; so replace it with a directory with quota
+        INodeDirectoryWithQuota newNode = 
+          new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
+        // non-root directory node; parent != null
+        INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
+        dirNode = newNode;
+        parent.replaceChild(newNode);
       }
+      return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
     }
   }
   
   /**
-   * @see #unprotectedSetQuota(String, long)
-   */
-  void setQuota(String src, long quota) 
-  throws FileNotFoundException, QuotaExceededException {
-    unprotectedSetQuota(src, quota);
-    fsImage.getEditLog().logSetQuota(src, quota);
-  }
-  
-  /**
-   * Remove the quota for a directory
-   * @param src The string representation of the path to the directory
-   * @throws FileNotFoundException if the path does not exist or it is a file
-   */
-  void unprotectedClearQuota(String src) throws IOException {
-    String srcs = normalizePath(src);
-    byte[][] components = INode.getPathComponents(src);
-    INode[] inodes = new INode[components.length==1?1:2];
-    synchronized (rootDir) {
-      rootDir.getExistingPathINodes(components, inodes);
-      INode targetNode = inodes[inodes.length-1];
-      if (targetNode == null || !targetNode.isDirectory()) {
-        throw new FileNotFoundException("Directory does not exist: " + srcs);
-      } else if (targetNode instanceof INodeDirectoryWithQuota) {
-        // a directory inode with quota
-        // replace the directory with quota with a non-quota one
-        INodeDirectoryWithQuota dirNode = (INodeDirectoryWithQuota)targetNode;
-        INodeDirectory newNode = new INodeDirectory(dirNode);
-        if (dirNode == rootDir) { // root
-          throw new IOException("Can't clear the root's quota");
-        } else { // non-root directory node; parent != null
-          INodeDirectory parent = (INodeDirectory)inodes[0];
-          parent.replaceChild(newNode);
-        }
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
+   * contract.
+   * @see #unprotectedSetQuota(String, long, long)
+   */
+  void setQuota(String src, long nsQuota, long dsQuota) 
+                throws FileNotFoundException, QuotaExceededException {
+    synchronized (rootDir) {    
+      INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
+      if (dir != null) {
+        fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
+                                         dir.getDsQuota());
       }
     }
   }
   
-  /**
-   * @see #unprotectedClearQuota(String)
-   */
-  void clearQuota(String src) throws IOException {
-    unprotectedClearQuota(src);
-    fsImage.getEditLog().logClearQuota(src);
-  }
-  
   long totalInodes() {
     synchronized (rootDir) {
       return rootDir.numItemsInTree();

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Sep 19 16:40:54 2008
@@ -62,9 +62,12 @@
   private static final byte OP_SET_OWNER = 8;
   private static final byte OP_CLOSE = 9;    // close after write
   private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
-  private static final byte OP_SET_QUOTA = 11; // set a directory's quota
-  private static final byte OP_CLEAR_QUOTA = 12; // clear a directory's quota
+  /* The following two are not used any more. Should be removed once
+   * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
+  private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
+  private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
+  private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -732,23 +735,34 @@
               FSImage.readString(in), FSImage.readString(in));
           break;
         }
-        case OP_SET_QUOTA: {
+        case OP_SET_NS_QUOTA: {
           if (logVersion > -16) {
             throw new IOException("Unexpected opcode " + opcode
                 + " for version " + logVersion);
           }
           fsDir.unprotectedSetQuota(FSImage.readString(in), 
-              readLongWritable(in) );
+                                    readLongWritable(in), 
+                                    FSConstants.QUOTA_DONT_SET);
           break;
         }
-        case OP_CLEAR_QUOTA: {
+        case OP_CLEAR_NS_QUOTA: {
           if (logVersion > -16) {
             throw new IOException("Unexpected opcode " + opcode
                 + " for version " + logVersion);
           }
-          fsDir.unprotectedClearQuota(FSImage.readString(in));
+          fsDir.unprotectedSetQuota(FSImage.readString(in),
+                                    FSConstants.QUOTA_RESET,
+                                    FSConstants.QUOTA_DONT_SET);
           break;
         }
+
+        case OP_SET_QUOTA:
+          fsDir.unprotectedSetQuota(FSImage.readString(in),
+                                    readLongWritable(in),
+                                    readLongWritable(in));
+                                      
+          break;
+
         case OP_TIMES: {
           numOpTimes++;
           int length = in.readInt();
@@ -1016,23 +1030,16 @@
             FSEditLog.toLogReplication(replication));
   }
   
-  /** Add set quota record to edit log
+  /** Add set namespace quota record to edit log
    * 
    * @param src the string representation of the path to a directory
    * @param quota the directory size limit
    */
-  void logSetQuota(String src, long quota) {
-    logEdit(OP_SET_QUOTA, new UTF8(src), new LongWritable(quota));
+  void logSetQuota(String src, long nsQuota, long dsQuota) {
+    logEdit(OP_SET_QUOTA, new UTF8(src), 
+            new LongWritable(nsQuota), new LongWritable(dsQuota));
   }
 
-  /** Add clear quota record to edit log
-   * 
-   * @param src the string representation of the path to a directory
-   */
-  void logClearQuota(String src) {
-    logEdit(OP_CLEAR_QUOTA, new UTF8(src));
-  }
-  
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
     logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions);

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Sep 19 16:40:54 2008
@@ -59,7 +59,6 @@
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
@@ -893,9 +892,13 @@
         }
         
         // get quota only when the node is a directory
-        long quota = -1L;
+        long nsQuota = -1L;
         if (imgVersion <= -16 && blocks == null) {
-          quota = in.readLong();
+          nsQuota = in.readLong();
+        }
+        long dsQuota = -1L;
+        if (imgVersion <= -18 && blocks == null) {
+          dsQuota = in.readLong();
         }
         
         PermissionStatus permissions = fsNamesys.getUpgradePermission();
@@ -904,8 +907,8 @@
         }
         if (path.length() == 0) { // it is the root
           // update the root's attributes
-          if (quota != -1) {
-            fsDir.rootDir.setQuota(quota);
+          if (nsQuota != -1 || dsQuota != -1) {
+            fsDir.rootDir.setQuota(nsQuota, dsQuota);
           }
           fsDir.rootDir.setModificationTime(modificationTime);
           fsDir.rootDir.setPermissionStatus(permissions);
@@ -918,7 +921,8 @@
         }
         // add new inode
         parentINode = fsDir.addToParent(path, parentINode, permissions,
-            blocks, replication, modificationTime, atime, quota, blockSize);
+                                        blocks, replication, modificationTime, 
+                                        atime, nsQuota, dsQuota, blockSize);
       }
       
       // load datanode info
@@ -927,8 +931,6 @@
       // load Files Under Construction
       this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
       
-      // update the count of each directory with quota
-      fsDir.updateCountForINodeWithQuota();
     } finally {
       in.close();
     }
@@ -968,6 +970,8 @@
       numEdits += FSEditLog.loadFSEdits(edits);
       edits.close();
     }
+    // update the counts.
+    FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();    
     return numEdits;
   }
 
@@ -1109,7 +1113,8 @@
       out.writeLong(0);   // access time
       out.writeLong(0);   // preferred block size
       out.writeInt(-1);    // # of blocks
-      out.writeLong(node.getQuota());
+      out.writeLong(node.getNsQuota());
+      out.writeLong(node.getDsQuota());
       FILE_PERM.fromShort(node.getFsPermissionShort());
       PermissionStatus.write(out, node.getUserName(),
                              node.getGroupName(),

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 19 16:40:54 2008
@@ -296,7 +296,7 @@
   /**
    * Initialize FSNamesystem.
    */
-  private synchronized void initialize(NameNode nn, Configuration conf) throws IOException {
+  private void initialize(NameNode nn, Configuration conf) throws IOException {
     this.systemStart = now();
     this.startTime = new Date(systemStart); 
     setConfigurationParameters(conf);
@@ -417,7 +417,7 @@
   /**
    * Initializes some of the members from configuration
    */
-  private synchronized void setConfigurationParameters(Configuration conf) 
+  private void setConfigurationParameters(Configuration conf) 
                                           throws IOException {
     fsNamesystemObject = this;
     try {
@@ -1269,13 +1269,18 @@
 
     // Allocate a new block and record it in the INode. 
     synchronized (this) {
-      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      INode[] pathINodes = dir.getExistingPathINodes(src);
+      int inodesLen = pathINodes.length;
+      checkLease(src, clientName, pathINodes[inodesLen-1]);
+      INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
+                                                pathINodes[inodesLen - 1];
+                                                           
       if (!checkFileProgress(pendingFile, false)) {
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pendingFile);
+      newBlock = allocateBlock(src, pathINodes);
       pendingFile.setTargets(targets);
       
       for (DatanodeDescriptor dn : targets) {
@@ -1305,11 +1310,18 @@
     return true;
   }
   
-  // make sure that we still have the lease on this file
-  private INodeFileUnderConstruction checkLease(String src, String holder
-      ) throws IOException {
-    INode file = dir.getFileINode(src);
-    if (file == null) {
+  // make sure that we still have the lease on this file.
+  private INodeFileUnderConstruction checkLease(String src, String holder) 
+                                                      throws IOException {
+    INodeFile file = dir.getFileINode(src);
+    checkLease(src, holder, file);
+    return (INodeFileUnderConstruction)file;
+  }
+
+  private void checkLease(String src, String holder, INode file) 
+                                                     throws IOException {
+
+    if (file == null || file.isDirectory()) {
       Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
       throw new LeaseExpiredException("No lease on " + src +
                                       " File does not exist. " +
@@ -1330,7 +1342,6 @@
       throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
           + pendingFile.getClientName() + " but is accessed by " + holder);
     }
-    return pendingFile;    
   }
 
   /**
@@ -1403,14 +1414,18 @@
     
   /**
    * Allocate a block at the given pending filename
+   * 
+   * @param src path to the file
+   * @param indoes INode representing each of the components of src. 
+   *        <code>inodes[inodes.length-1]</code> is the INode for the file.
    */
-  private Block allocateBlock(String src, INode file) throws IOException {
+  private Block allocateBlock(String src, INode[] inodes) throws IOException {
     Block b = null;
     do {
       b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 
                     getGenerationStamp());
     } while (isValidBlock(b));
-    b = dir.addBlock(src, file, b);
+    b = dir.addBlock(src, inodes, b);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
                                  +src+ ". "+b);
     return b;
@@ -1727,32 +1742,16 @@
   }
 
   /**
-   * Set the quota for a directory.
-   * @param path The string representation of the path to the directory
-   * @param quota The limit of the number of names in or below the directory
-   * @throws IOException if the path is not a directory or the number of
-   * existing names in or below the directory is greater than the given quota
+   * Set the namespace quota and diskspace quota for a directory.
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
+   * contract.
    */
-  void setQuota(String path, long quota) throws IOException {
+  void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
     if (isPermissionEnabled) {
       checkSuperuserPrivilege();
     }
     
-    dir.setQuota(path, quota);
-    getEditLog().logSync();
-  }
-  
-  /**
-   * Remove the quota for a directory
-   * @param path The string representation of the path to the directory
-   * @throws IOException if the path is not a directory
-   */
-  void clearQuota(String path) throws IOException {
-    if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
-    }
-    
-    dir.clearQuota(path);
+    dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
   
@@ -2838,6 +2837,23 @@
             LOG.warn("Error in deleting bad block " + block + e);
           }
         }
+        
+        //Updated space consumed if required.
+        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
+        long diff = (file == null) ? 0 :
+                    (file.getPreferredBlockSize() - storedBlock.getNumBytes());
+        
+        if (diff > 0 && file.isUnderConstruction() &&
+            cursize < storedBlock.getNumBytes()) {
+          try {
+            String path = /* For finding parents */ 
+              leaseManager.findPath((INodeFileUnderConstruction)file);
+            dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
+          } catch (IOException e) {
+            LOG.warn("Unexpected exception while updating disk space : " +
+                     e.getMessage());
+          }
+        }
       }
       block = storedBlock;
     }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Sep 19 16:40:54 2008
@@ -17,13 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.FileNotFoundException;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Arrays;
 import java.util.List;
-import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ContentSummary;
@@ -31,8 +27,6 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 
 /**
@@ -46,6 +40,23 @@
   protected long modificationTime;
   protected int accessTime; // precise to the last hour
 
+  /** Simple wrapper for two counters : 
+   *  nsCount (namespace consumed) and dsCount (diskspace consumed).
+   */
+  static class DirCounts {
+    long nsCount = 0;
+    long dsCount = 0;
+    
+    /** returns namespace count */
+    long getNsCount() {
+      return nsCount;
+    }
+    /** returns diskspace count */
+    long getDsCount() {
+      return dsCount;
+    }
+  }
+  
   //Only updated by updatePermissionStatus(...).
   //Other codes should not modify it.
   private long permission;
@@ -173,12 +184,13 @@
 
   /** Compute {@link ContentSummary}. */
   public final ContentSummary computeContentSummary() {
-    long[] a = computeContentSummary(new long[]{0,0,0});
-    return new ContentSummary(a[0], a[1], a[2], getQuota());
+    long[] a = computeContentSummary(new long[]{0,0,0,0});
+    return new ContentSummary(a[0], a[1], a[2], getNsQuota(), 
+                              a[3], getDsQuota());
   }
   /**
    * @return an array of three longs. 
-   * 0: length, 1: file count, 2: directory count
+   * 0: length, 1: file count, 2: directory count 3: disk space
    */
   abstract long[] computeContentSummary(long[] summary);
   
@@ -186,19 +198,25 @@
    * Get the quota set for this inode
    * @return the quota if it is set; -1 otherwise
    */
-  long getQuota() {
+  long getNsQuota() {
     return -1;
   }
 
+  long getDsQuota() {
+    return -1;
+  }
+  
+  boolean isQuotaSet() {
+    return getNsQuota() >= 0 || getDsQuota() >= 0;
+  }
+  
   /**
-   * Get the total number of names in the tree
-   * rooted at this inode including the root
-   * @return The total number of names in this tree
+   * Adds total nubmer of names and total disk space taken under 
+   * this tree to counts.
+   * Returns updated counts object.
    */
-  long numItemsInTree() {
-    return 1;
-  }
-    
+  abstract DirCounts spaceConsumedInTree(DirCounts counts);
+  
   /**
    * Get local file name
    * @return local file name

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Sep 19 16:40:54 2008
@@ -190,6 +190,8 @@
    *         deepest INodes. The array size will be the number of expected
    *         components in the path, and non existing components will be
    *         filled with null
+   *         
+   * @see #getExistingPathINodes(byte[][], INode[])
    */
   INode[] getExistingPathINodes(String path) {
     byte[][] components = getPathComponents(path);
@@ -206,7 +208,7 @@
    * @param node INode to insert
    * @param inheritPermission inherit permission from parent?
    * @return  null if the child with this name already exists; 
-   *          inserted INode, otherwise
+   *          node, otherwise
    */
   <T extends INode> T addChild(final T node, boolean inheritPermission) {
     if (inheritPermission) {
@@ -300,17 +302,15 @@
     return parent;
   }
 
-  /**
-   */
-  long numItemsInTree() {
-    long total = 1L;
-    if (children == null) {
-      return total;
-    }
-    for (INode child : children) {
-      total += child.numItemsInTree();
+  /** {@inheritDoc} */
+  DirCounts spaceConsumedInTree(DirCounts counts) {
+    counts.nsCount += 1;
+    if (children != null) {
+      for (INode child : children) {
+        child.spaceConsumedInTree(counts);
+      }
     }
-    return total;
+    return counts;    
   }
 
   /** {@inheritDoc} */

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Fri Sep 19 16:40:54 2008
@@ -24,92 +24,139 @@
  * Directory INode class that has a quota restriction
  */
 class INodeDirectoryWithQuota extends INodeDirectory {
-  private long quota;
-  private long count;
+  private long nsQuota; /// NameSpace quota
+  private long nsCount;
+  private long dsQuota; /// disk space quota
+  private long diskspace;
   
   /** Convert an existing directory inode to one with the given quota
    * 
-   * @param quota Quota to be assigned to this inode
+   * @param nsQuota Namespace quota to be assigned to this inode
+   * @param dsQuota Diskspace quota to be assigned to this indoe
    * @param other The other inode from which all other properties are copied
    */
-  INodeDirectoryWithQuota(long quota, INodeDirectory other)
+  INodeDirectoryWithQuota(long nsQuota, long dsQuota, INodeDirectory other)
   throws QuotaExceededException {
     super(other);
-    this.count = other.numItemsInTree();
-    setQuota(quota);
+    INode.DirCounts counts = new INode.DirCounts();
+    other.spaceConsumedInTree(counts);
+    this.nsCount= counts.getNsCount();
+    this.diskspace = counts.getDsCount();
+    setQuota(nsQuota, dsQuota);
   }
   
   /** constructor with no quota verification */
   INodeDirectoryWithQuota(
-      PermissionStatus permissions, long modificationTime, long quota)
+      PermissionStatus permissions, long modificationTime, 
+      long nsQuota, long dsQuota)
   {
     super(permissions, modificationTime);
-    this.quota = quota;
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(String name, PermissionStatus permissions, long quota)
+  INodeDirectoryWithQuota(String name, PermissionStatus permissions, 
+                          long nsQuota, long dsQuota)
   {
     super(name, permissions);
-    this.quota = quota;
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
-  /** Get this directory's quota
-   * @return this directory's quota
+  /** Get this directory's namespace quota
+   * @return this directory's namespace quota
    */
-  long getQuota() {
-    return quota;
+  long getNsQuota() {
+    return nsQuota;
+  }
+  
+  /** Get this directory's diskspace quota
+   * @return this directory's diskspace quota
+   */
+  long getDsQuota() {
+    return dsQuota;
   }
   
   /** Set this directory's quota
    * 
-   * @param quota Quota to be set
-   * @throws QuotaExceededException if the given quota is less than 
-   *                                the size of the tree
-   */
-  void setQuota(long quota) throws QuotaExceededException {
-    verifyQuota(quota, this.count);
-    this.quota = quota;
+   * @param nsQuota Namespace quota to be set
+   * @param dsQuota diskspace quota to be set
+   * @throws QuotaExceededException if quota is modified and the modified quota
+   *         is too low.
+   *                                
+   */
+  void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+    // if a quota is not chaged, ignore that in verification.
+    if ((newNsQuota >=0 && newNsQuota != nsQuota && newNsQuota < nsCount)  ||
+        (newDsQuota >=0 && newDsQuota != dsQuota && newDsQuota < diskspace)) {
+      throw new QuotaExceededException(newNsQuota, nsCount, 
+                                       newDsQuota, diskspace);
+    }
+
+    nsQuota = newNsQuota;
+    dsQuota = newDsQuota;
   }
   
+  
+  @Override
+  DirCounts spaceConsumedInTree(DirCounts counts) {
+    counts.nsCount += nsCount;
+    counts.dsCount += diskspace;
+    return counts;
+  }
+
   /** Get the number of names in the subtree rooted at this directory
    * @return the size of the subtree rooted at this directory
    */
   long numItemsInTree() {
-    return count;
+    return nsCount;
+  }
+  
+  long diskspaceConsumed() {
+    return diskspace;
   }
   
   /** Update the size of the tree
    * 
-   * @param delta the change of the tree size
+   * @param nsDelta the change of the tree size
+   * @param dsDelta change to disk space occupied
    * @throws QuotaExceededException if the changed size is greater 
    *                                than the quota
    */
-  void updateNumItemsInTree(long delta) throws QuotaExceededException {
-    long newCount = this.count + delta;
-    if (delta>0) {
-      verifyQuota(this.quota, newCount);
+  void updateNumItemsInTree(long nsDelta, long dsDelta) throws 
+                            QuotaExceededException {
+    long newCount = nsCount + nsDelta;
+    long newDiskspace = diskspace + dsDelta;
+    if (nsDelta>0 || dsDelta>0) {
+      verifyQuota(nsQuota, newCount, dsQuota, newDiskspace);
     }
-    this.count = newCount;
+    nsCount = newCount;
+    diskspace = newDiskspace;
   }
   
-  /** Set the size of the tree rooted at this directory
+  /** 
+   * Sets namespace and diskspace take by the directory rooted 
+   * at this INode. This should be used carefully. It does not check 
+   * for quota violations.
    * 
-   * @param count size of the directory to be set
-   * @throws QuotaExceededException if the given count is greater than quota
+   * @param namespace size of the directory to be set
+   * @param diskspace disk space take by all the nodes under this directory
    */
-  void setCount(long count) throws QuotaExceededException {
-    verifyQuota(this.quota, count);
-    this.count = count;
+  void setSpaceConsumed(long namespace, long diskspace) {
+    this.nsCount = namespace;
+    this.diskspace = diskspace;
   }
   
-  /** Verify if the count satisfies the quota restriction 
+  /** Verify if the namespace count disk space satisfies the quota restriction 
    * @throws QuotaExceededException if the given quota is less than the count
    */
-  private static void verifyQuota(long quota, long count)
-  throws QuotaExceededException {
-    if (quota < count) {
-      throw new QuotaExceededException(quota, count);
+  private static void verifyQuota(long nsQuota, long nsCount, 
+                                  long dsQuota, long diskspace)
+                                  throws QuotaExceededException {
+    if ((nsQuota >= 0 && nsQuota < nsCount) || 
+        (dsQuota >= 0 && dsQuota < diskspace)) {
+      throw new QuotaExceededException(nsQuota, nsCount, dsQuota, diskspace);
     }
   }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Sep 19 16:40:54 2008
@@ -130,9 +130,40 @@
     }
     summary[0] += bytes;
     summary[1]++;
+    summary[3] += diskspaceConsumed();
     return summary;
   }
 
+  
+
+  @Override
+  DirCounts spaceConsumedInTree(DirCounts counts) {
+    counts.nsCount += 1;
+    counts.dsCount += diskspaceConsumed();
+    return counts;
+  }
+
+  long diskspaceConsumed() {
+    return diskspaceConsumed(blocks);
+  }
+  
+  long diskspaceConsumed(Block[] blkArr) {
+    long size = 0;
+    for (Block blk : blkArr) {
+      if (blk != null) {
+        size += blk.getNumBytes();
+      }
+    }
+    /* If the last block is being written to, use prefferedBlockSize
+     * rather than the actual block size.
+     */
+    if (blkArr.length > 0 && blkArr[blkArr.length-1] != null && 
+        isUnderConstruction()) {
+      size += preferredBlockSize - blocks[blocks.length-1].getNumBytes();
+    }
+    return size * blockReplication;
+  }
+  
   /**
    * Get the preferred block size of the file.
    * @return the number of bytes

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=697284&r1=697283&r2=697284&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Sep 19 16:40:54 2008
@@ -580,13 +580,9 @@
   }
 
   /** {@inheritDoc} */
-  public void setQuota(String path, long quota) throws IOException {
-    namesystem.setQuota(path, quota);
-  }
-  
-  /** {@inheritDoc} */
-  public void clearQuota(String path) throws IOException {
-    namesystem.clearQuota(path);
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
+                       throws IOException {
+    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
   }
   
   /** {@inheritDoc} */