You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/04/29 05:03:27 UTC

svn commit: r1097671 [1/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/co...

Author: todd
Date: Fri Apr 29 03:03:25 2011
New Revision: 1097671

URL: http://svn.apache.org/viewvc?rev=1097671&view=rev
Log:
Merge trunk as of r1097329 into HDFS-1073

Added:
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java
      - copied unchanged from r1097628, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
      - copied unchanged from r1097628, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
      - copied unchanged from r1097628, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
      - copied unchanged from r1097628, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java
      - copied unchanged from r1097628, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java
Modified:
    hadoop/hdfs/branches/HDFS-1073/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh
    hadoop/hdfs/branches/HDFS-1073/build.xml   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml
    hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored
    hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-1073/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-265:796829-820463
 /hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:1086482-1095244
+/hadoop/hdfs/trunk:1086482-1097628

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.txt?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.txt Fri Apr 29 03:03:25 2011
@@ -34,6 +34,8 @@ Trunk (unreleased changes)
 
     HDFS-1070. Speedup namenode image loading and saving by storing only
     local file names. (hairong)
+    
+    HDFS-1751. Intrinsic limits for HDFS files, directories (daryn via boryas).
 
   IMPROVEMENTS
 
@@ -121,6 +123,29 @@ Trunk (unreleased changes)
     HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
     also HADOOP-7230.  (Daryn Sharp via szetszwo)
 
+    HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+    being written are closed for a grace period, and start a new thread when
+    new files are opened for write.  (szetszwo)
+
+    HDFS-1854. make failure message more useful in
+    DFSTestUtil.waitReplication(). (Matt Foley via eli)
+
+    HDFS-1562. Add rack policy tests. (eli)
+ 
+    HDFS-1856. TestDatanodeBlockScanner waits forever, errs without giving 
+    information. (Matt Foley via eli)
+
+    HDFS-1295. Improve namenode restart times by short-circuiting the
+    first block reports from datanodes. (Matt Foley via suresh)
+
+    HDFS-1843. Discover file not found early for file append. 
+    (Bharath Mundlapudi via jitendra)
+
+    HDFS-1862. Improve test reliability of HDFS-1594. (Aaron T. Myers via eli)
+
+    HDFS-1846. Preallocate edit log with OP_INVALID instead of zero bytes
+    to ensure blocks are actually allocated. (Aaron T. Myers via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -193,6 +218,19 @@ Trunk (unreleased changes)
     HDFS-1831. Fix append bug in FileContext and implement CreateFlag
     check (related to HADOOP-7223). (suresh)
 
+    HDFS-1594. When the disk becomes full Namenode is getting shutdown and 
+    not able to recover. (Aaron T. Myers via eli)
+
+    HDFS-1822. Handle editlog opcode conflict with 0.20.203 during upgrade,
+    by throwing an error to indicate the editlog needs to be empty.
+    (suresh)
+
+    HDFS-1808. TestBalancer waits forever, errs without giving information.
+    (Matt Foley via eli)
+
+    HDFS-1829. TestNodeCount waits forever, errs without giving information.
+    (Matt Foley via eli)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -414,6 +452,9 @@ Release 0.22.0 - Unreleased
 
     HDFS-1582. Remove auto-generated native build files. (rvs via eli)
 
+    HDFS-1861. Rename dfs.datanode.max.xcievers and bump its default value.
+    (eli)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -657,6 +698,14 @@ Release 0.22.0 - Unreleased
     HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
     and likely to fail on fast servers. (Matt Foley via eli)
 
+    HDFS-1845. Symlink comes up as directory after namenode restart.
+    (John George via eli)
+
+    HDFS-1666. Disable failing hdfsproxy test TestAuthorizationFilter (todd)
+
+    HDFS-1823. start-dfs.sh script fails if HADOOP_HOME is not set.
+    (tomwhite via eli)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS
@@ -712,6 +761,9 @@ Release 0.21.1 - Unreleased
     HDFS-1786. Some cli test cases expect a "null" message
     (Uma Maheswara Rao G via todd)
 
+    HDFS-1855. TestDatanodeBlockScanner.testBlockCorruptionRecoveryPolicy()
+    part 2 fails in two different ways. (Matt Foley via eli)
+
 Release 0.21.0 - 2010-08-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh (original)
+++ hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh Fri Apr 29 03:03:25 2011
@@ -27,6 +27,8 @@ if [ -d "${HADOOP_COMMON_HOME}" ]; then
   . "$HADOOP_COMMON_HOME"/bin/hadoop-config.sh
 elif [ -d "${HADOOP_HOME}" ]; then
   . "$HADOOP_HOME"/bin/hadoop-config.sh
+elif [ -e "${HADOOP_HDFS_HOME}"/bin/hadoop-config.sh ]; then
+  . "$HADOOP_HDFS_HOME"/bin/hadoop-config.sh
 else
   echo "Hadoop common not found."
   exit

Propchange: hadoop/hdfs/branches/HDFS-1073/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/build.xml:779102
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
 /hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:1086482-1095244
+/hadoop/hdfs/trunk/build.xml:1086482-1097628

Propchange: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1095244
+/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1097628

Modified: hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml Fri Apr 29 03:03:25 2011
@@ -46,9 +46,11 @@
   <!-- Test all the contribs.                               -->
   <!-- ====================================================== -->
   <target name="test">
+    <!-- hdfsproxy tests failing due to HDFS-1666
     <subant target="test">
       <fileset dir="." includes="hdfsproxy/build.xml"/>
     </subant>
+      -->
   </target>
   
   

Propchange: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1095244
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1097628

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:1086482-1095244
+/hadoop/hdfs/trunk/src/java:1086482-1097628

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml Fri Apr 29 03:03:25 2011
@@ -195,6 +195,20 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.namenode.fs-limits.max-component-length</name>
+  <value>0</value>
+  <description>Defines the maximum number of characters in each component
+      of a path.  A value of 0 will disable the check.</description>
+</property>
+
+<property>
+  <name>dfs.namenode.fs-limits.max-directory-items</name>
+  <value>0</value>
+  <description>Defines the maximum number of items that a directory may
+      contain.  A value of 0 will disable the check.</description>
+</property>
+
+<property>
   <name>dfs.namenode.edits.dir</name>
   <value>${dfs.namenode.name.dir}</value>
   <description>Determines where on the local filesystem the DFS name node

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Apr 29 03:03:25 2011
@@ -45,6 +45,7 @@ import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -128,7 +129,6 @@ public class DFSClient implements FSCons
   private volatile long serverDefaultsLastUpdate;
   static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   Configuration conf;
   long defaultBlockSize;
   private short defaultReplication;
@@ -138,6 +138,7 @@ public class DFSClient implements FSCons
   final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
+  final LeaseChecker leasechecker;
 
   /**
    * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
@@ -253,6 +254,7 @@ public class DFSClient implements FSCons
 
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
+    this.leasechecker = new LeaseChecker(hdfsTimeout);
 
     this.ugi = UserGroupInformation.getCurrentUser();
     
@@ -753,10 +755,14 @@ public class DFSClient implements FSCons
    * 
    * @see ClientProtocol#append(String, String) 
    */
-  OutputStream append(String src, int buffersize, Progressable progress)
+  OutputStream append(String src, int buffersize, Progressable progress) 
       throws IOException {
     checkOpen();
     HdfsFileStatus stat = getFileInfo(src);
+    if (stat == null) { // No file found
+      throw new FileNotFoundException("failed to append to non-existent file "
+          + src + " on client " + clientName);
+    }
     OutputStream result = callAppend(stat, src, buffersize, progress);
     leasechecker.put(src, result);
     return result;
@@ -1358,38 +1364,106 @@ public class DFSClient implements FSCons
     }
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
   /** Lease management*/
-  class LeaseChecker implements Runnable {
+  class LeaseChecker {
+    static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+    static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
     /** A map from src -> DFSOutputStream of files that are currently being
      * written by this client.
      */
     private final SortedMap<String, OutputStream> pendingCreates
         = new TreeMap<String, OutputStream>();
+    /** The time in milliseconds that the map became empty. */
+    private long emptyTime = Long.MAX_VALUE;
+    /** A fixed lease renewal time period in milliseconds */
+    private final long renewal;
 
+    /** A daemon for renewing lease */
     private Daemon daemon = null;
-    
+    /** Only the daemon with currentId should run. */
+    private int currentId = 0;
+
+    /** 
+     * A period in milliseconds that the lease renewer thread should run
+     * after the map became empty.
+     * If the map is empty for a time period longer than the grace period,
+     * the renewer should terminate.  
+     */
+    private long gracePeriod;
+    /**
+     * The time period in milliseconds
+     * that the renewer sleeps for each iteration. 
+     */
+    private volatile long sleepPeriod;
+
+    private LeaseChecker(final long timeout) {
+      this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)? 
+          timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+      setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+    }
+
+    /** Set the grace period and adjust the sleep period accordingly. */
+    void setGraceSleepPeriod(final long gracePeriod) {
+      if (gracePeriod < 100L) {
+        throw new HadoopIllegalArgumentException(gracePeriod
+            + " = gracePeriod < 100ms is too small.");
+      }
+      synchronized(this) {
+        this.gracePeriod = gracePeriod;
+      }
+      final long half = gracePeriod/2;
+      this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+          half: LEASE_RENEWER_SLEEP_DEFAULT;
+    }
+
+    /** Is the daemon running? */
+    synchronized boolean isRunning() {
+      return daemon != null && daemon.isAlive();
+    }
+
+    /** Is the empty period longer than the grace period? */  
+    private synchronized boolean isRenewerExpired() {
+      return emptyTime != Long.MAX_VALUE
+          && System.currentTimeMillis() - emptyTime > gracePeriod;
+    }
+
     synchronized void put(String src, OutputStream out) {
       if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
+        if (daemon == null || isRenewerExpired()) {
+          //start a new deamon with a new id.
+          final int id = ++currentId;
+          daemon = new Daemon(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                LeaseChecker.this.run(id);
+              } catch(InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+                      + " is interrupted.", e);
+                }
+              }
+            }
+          });
           daemon.start();
         }
         pendingCreates.put(src, out);
+        emptyTime = Long.MAX_VALUE;
       }
     }
     
     synchronized void remove(String src) {
       pendingCreates.remove(src);
+      if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the map is empty.
+        emptyTime = System.currentTimeMillis();
+      }
     }
     
     void interruptAndJoin() throws InterruptedException {
       Daemon daemonCopy = null;
       synchronized (this) {
-        if (daemon != null) {
+        if (isRunning()) {
           daemon.interrupt();
           daemonCopy = daemon;
         }
@@ -1456,37 +1530,30 @@ public class DFSClient implements FSCons
      * Periodically check in with the namenode and renew all the leases
      * when the lease period is half over.
      */
-    public void run() {
-      long lastRenewed = 0;
-      int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
-      if (hdfsTimeout > 0) {
-        renewal = Math.min(renewal, hdfsTimeout/2);
-      }
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > renewal) {
+    private void run(final int id) throws InterruptedException {
+      for(long lastRenewed = System.currentTimeMillis();
+          clientRunning && !Thread.interrupted();
+          Thread.sleep(sleepPeriod)) {
+        if (System.currentTimeMillis() - lastRenewed >= renewal) {
           try {
             renew();
             lastRenewed = System.currentTimeMillis();
           } catch (SocketTimeoutException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Shutting down HDFS client...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Aborting ...", ie);
             abort();
             break;
           } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Will retry shortly...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
           }
         }
 
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
+        synchronized(this) {
+          if (id != currentId || isRenewerExpired()) {
+            //no longer the current daemon or expired
+            return;
           }
-          return;
         }
       }
     }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Apr 29 03:03:25 2011
@@ -120,6 +120,12 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY = "dfs.namenode.delegation.token.max-lifetime";
   public static final long    DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days
 
+  //Filesystem limit keys
+  public static final String  DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY = "dfs.namenode.fs-limits.max-component-length";
+  public static final int     DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT = 0; // no limit
+  public static final String  DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY = "dfs.namenode.fs-limits.max-directory-items";
+  public static final int     DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT = 0; // no limit
+
   //Following keys have no defaults
   public static final String  DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
   public static final String  DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
@@ -166,8 +172,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3;
   public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";
   public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50075";
-  public static final String  DFS_DATANODE_MAX_XCIEVERS_KEY = "dfs.datanode.max.xcievers";
-  public static final int     DFS_DATANODE_MAX_XCIEVERS_DEFAULT = 256;
+  public static final String  DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads";
+  public static final int     DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
   public static final String  DFS_DATANODE_NUMBLOCKS_KEY = "dfs.datanode.numblocks";
   public static final int     DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
   public static final String  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
@@ -254,4 +260,9 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+  public static final String  DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
+  public static final int     DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
+  public static final String  DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
+  public static final long    DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
+  public static final String  DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes";
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Fri Apr 29 03:03:25 2011
@@ -84,5 +84,6 @@ public class HdfsConfiguration extends C
     deprecate("dfs.permissions.supergroup", DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY);
     deprecate("dfs.write.packet.size", DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY);
     deprecate("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY);
+    deprecate("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY);
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Apr 29 03:03:25 2011
@@ -88,7 +88,11 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -31;
+  public static final int LAYOUT_VERSION = -34;
   // Current version: 
-  // -31: add persistent transaction ids
+  // -31, -32 and -33 are reserved for 0.20.203, 0.20.204 and 0.22.
+  // -34: persistent transaction IDs
+
+  // Record of version numbers for specific changes:
+  public static final int FIRST_TXNID_BASED_LAYOUT_VERSION = -35;
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Apr 29 03:03:25 2011
@@ -78,6 +78,9 @@ public abstract class Storage extends St
   // last layout version that did not support persistent rbw replicas
   public static final int PRE_RBW_LAYOUT_VERSION = -19;
   
+  /** Layout versions of 0.20.203 release */
+  public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
+
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
@@ -871,4 +874,13 @@ public abstract class Storage extends St
       + "-" + Integer.toString(storage.getLayoutVersion())
       + "-" + Long.toString(storage.getCTime());
   }
+  
+  public static boolean is203LayoutVersion(int layoutVersion) {
+    for (int lv203 : LAYOUT_VERSIONS_203) {
+      if (lv203 == layoutVersion) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Apr 29 03:03:25 2011
@@ -1131,23 +1131,27 @@ public class DataNode extends Configured
    * @throws IOException
    */
   private DatanodeCommand blockReport() throws IOException {
-    // send block report
+    // send block report if timer has expired.
     DatanodeCommand cmd = null;
     long startTime = now();
     if (startTime - lastBlockReport > blockReportInterval) {
-      //
-      // Send latest block report if timer has expired.
-      // Get back a list of local block(s) that are obsolete
-      // and can be safely GC'ed.
-      //
-      long brStartTime = now();
+
+      // Create block report
+      long brCreateStartTime = now();
       BlockListAsLongs bReport = data.getBlockReport();
 
+      // Send block report
+      long brSendStartTime = now();
       cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
-      long brTime = now() - brStartTime;
-      myMetrics.blockReports.inc(brTime);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
-          " blocks got processed in " + brTime + " msecs");
+
+      // Log the block report processing stats from Datanode perspective
+      long brSendCost = now() - brSendStartTime;
+      long brCreateCost = brSendStartTime - brCreateStartTime;
+      myMetrics.blockReports.inc(brSendCost);
+      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+          + " blocks took " + brCreateCost + " msec to generate and "
+          + brSendCost + " msecs for RPC and NN processing");
+
       //
       // If we have sent the first block report, then wait a random
       // time before we start the periodic block reports.

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Fri Apr 29 03:03:25 2011
@@ -56,7 +56,8 @@ class DataXceiverServer implements Runna
    * Enforcing the limit is required in order to avoid data-node
    * running out of memory.
    */
-  int maxXceiverCount = DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_DEFAULT;
+  int maxXceiverCount =
+    DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
 
   /** A manager to make sure that cluster balancing does not
    * take too much resources.
@@ -115,8 +116,8 @@ class DataXceiverServer implements Runna
     this.datanode = datanode;
     
     this.maxXceiverCount = 
-      conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_KEY,
-                  DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_DEFAULT);
+      conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
+                  DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
     
     this.estimateBlockSize = 
       conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -4,4 +4,4 @@
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1095244
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1097628

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Apr 29 03:03:25 2011
@@ -36,13 +36,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -763,8 +763,8 @@ public class BlockManager {
         }
 
         // Go through all blocks that need replications.
-        BlockIterator neededReplicationsIterator = neededReplications
-            .iterator();
+        UnderReplicatedBlocks.BlockIterator neededReplicationsIterator = 
+            neededReplications.iterator();
         // skip to the first unprocessed block, which is at replIndex
         for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
           neededReplicationsIterator.next();
@@ -1053,28 +1053,59 @@ public class BlockManager {
        */
     }
   }
+  
+  /**
+   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+   * updates to the information about under-construction blocks.
+   * Besides the block in question, it provides the ReplicaState
+   * reported by the datanode in the block report. 
+   */
+  private static class StatefulBlockInfo {
+    final BlockInfoUnderConstruction storedBlock;
+    final ReplicaState reportedState;
+    
+    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, 
+        ReplicaState reportedState) {
+      this.storedBlock = storedBlock;
+      this.reportedState = reportedState;
+    }
+  }
 
   /**
    * The given node is reporting all its blocks.  Use this info to
-   * update the (machine-->blocklist) and (block-->machinelist) tables.
+   * update the (datanode-->blocklist) and (block-->nodelist) tables.
    */
-  public void processReport(DatanodeDescriptor node,
-                            BlockListAsLongs report) throws IOException {
-    //
+  public void processReport(DatanodeDescriptor node, BlockListAsLongs report) 
+  throws IOException {
+    
+    boolean isFirstBlockReport = (node.numBlocks() == 0);
+    if (isFirstBlockReport) {
+      // Initial block reports can be processed a lot more efficiently than
+      // ordinary block reports.  This shortens NN restart times.
+      processFirstBlockReport(node, report);
+      return;
+    } 
+
+    // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
-    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
-    node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
 
+    // Process the blocks on each queue
+    for (StatefulBlockInfo b : toUC) { 
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
-    for (Block b : toAdd) {
-      addStoredBlock(b, node, null);
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, null, true);
     }
     for (Block b : toInvalidate) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
@@ -1088,16 +1119,287 @@ public class BlockManager {
   }
 
   /**
+   * processFirstBlockReport is intended only for processing "initial" block
+   * reports, the first block report received from a DN after it registers.
+   * It just adds all the valid replicas to the datanode, without calculating 
+   * a toRemove list (since there won't be any).  It also silently discards 
+   * any invalid blocks, thereby deferring their processing until 
+   * the next block report.
+   * @param node - DatanodeDescriptor of the node that sent the report
+   * @param report - the initial block report, to be processed
+   * @throws IOException 
+   */
+  void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report) 
+  throws IOException {
+    if (report == null) return;
+    assert (namesystem.hasWriteLock());
+    assert (node.numBlocks() == 0);
+    BlockReportIterator itBR = report.getBlockReportIterator();
+
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState reportedState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+      // If block does not belong to any file, we are done.
+      if (storedBlock == null) continue;
+      
+      // If block is corrupt, mark it and continue to next block.
+      BlockUCState ucState = storedBlock.getBlockUCState();
+      if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
+        markBlockAsCorrupt(storedBlock, node);
+        continue;
+      }
+      
+      // If block is under construction, add this replica to its list
+      if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+            node, iblk, reportedState);
+        //and fall through to next clause
+      }      
+      //add replica if appropriate
+      if (reportedState == ReplicaState.FINALIZED) {
+        addStoredBlockImmediate(storedBlock, node);
+      }
+    }
+  }
+
+  void reportDiff(DatanodeDescriptor dn, 
+      BlockListAsLongs newReport, 
+      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
+      Collection<Block> toRemove,           // remove from DatanodeDescriptor
+      Collection<Block> toInvalidate,       // should be removed from DN
+      Collection<BlockInfo> toCorrupt,      // add to corrupt replicas list
+      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+    // place a delimiter in the list which separates blocks 
+    // that have been reported from those that have not
+    BlockInfo delimiter = new BlockInfo(new Block(), 1);
+    boolean added = dn.addBlock(delimiter);
+    assert added : "Delimiting block cannot be present in the node";
+    if(newReport == null)
+      newReport = new BlockListAsLongs();
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+                                  toAdd, toInvalidate, toCorrupt, toUC);
+      // move block to the head of the list
+      if(storedBlock != null && storedBlock.findDatanode(dn) >= 0)
+        dn.moveBlockToHead(storedBlock);
+    }
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+        delimiter.getNext(0), dn);
+    while(it.hasNext())
+      toRemove.add(it.next());
+    dn.removeBlock(delimiter);
+  }
+
+  /**
+   * Process a block replica reported by the data-node.
+   * No side effects except adding to the passed-in Collections.
+   * 
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location should
+   * be added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * <li>If the reported replica is for a block currently marked "under
+   * construction" in the NN, then it should be added to the 
+   * BlockInfoUnderConstruction's list of replicas.</li>
+   * </ol>
+   * 
+   * @param dn descriptor for the datanode that made the report
+   * @param block reported block replica
+   * @param reportedState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @param toUC replicas of blocks currently under construction
+   * @return
+   */
+  BlockInfo processReportedBlock(DatanodeDescriptor dn, 
+      Block block, ReplicaState reportedState, 
+      Collection<BlockInfo> toAdd, 
+      Collection<Block> toInvalidate, 
+      Collection<BlockInfo> toCorrupt,
+      Collection<StatefulBlockInfo> toUC) {
+    
+    if(FSNamesystem.LOG.isDebugEnabled()) {
+      FSNamesystem.LOG.debug("Reported block " + block
+          + " on " + dn.getName() + " size " + block.getNumBytes()
+          + " replicaState = " + reportedState);
+    }
+  
+    // find block by blockId
+    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
+    BlockUCState ucState = storedBlock.getBlockUCState();
+    
+    // Block is on the NN
+    if(FSNamesystem.LOG.isDebugEnabled()) {
+      FSNamesystem.LOG.debug("In memory blockUCState = " + ucState);
+    }
+
+    // Ignore replicas already scheduled to be removed from the DN
+    if(belongsToInvalidates(dn.getStorageID(), block)) {
+      assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+        + " in recentInvalidatesSet should not appear in DN " + this;
+      return storedBlock;
+    }
+
+    if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
+      toCorrupt.add(storedBlock);
+      return storedBlock;
+    }
+
+    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      toUC.add(new StatefulBlockInfo(
+          (BlockInfoUnderConstruction)storedBlock, reportedState));
+      return storedBlock;
+    }
+
+    //add replica if appropriate
+    if (reportedState == ReplicaState.FINALIZED
+        && storedBlock.findDatanode(dn) < 0) {
+      toAdd.add(storedBlock);
+    }
+    return storedBlock;
+  }
+
+  /*
+   * The next two methods test the various cases under which we must conclude
+   * the replica is corrupt, or under construction.  These are laid out
+   * as switch statements, on the theory that it is easier to understand
+   * the combinatorics of reportedState and ucState that way.  It should be
+   * at least as efficient as boolean expressions.
+   */
+  private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState, 
+      BlockInfo storedBlock, BlockUCState ucState, 
+      DatanodeDescriptor dn) {
+    switch(reportedState) {
+    case FINALIZED:
+      switch(ucState) {
+      case COMPLETE:
+      case COMMITTED:
+        return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
+            || storedBlock.getNumBytes() != iblk.getNumBytes());
+      default:
+        return false;
+      }
+    case RBW:
+    case RWR:
+      return storedBlock.isComplete();
+    case RUR:       // should not be reported
+    case TEMPORARY: // should not be reported
+    default:
+      FSNamesystem.LOG.warn("Unexpected replica state " + reportedState
+          + " for block: " + storedBlock + 
+          " on " + dn.getName() + " size " + storedBlock.getNumBytes());
+      return true;
+    }
+  }
+
+  private boolean isBlockUnderConstruction(BlockInfo storedBlock, 
+      BlockUCState ucState, ReplicaState reportedState) {
+    switch(reportedState) {
+    case FINALIZED:
+      switch(ucState) {
+      case UNDER_CONSTRUCTION:
+      case UNDER_RECOVERY:
+        return true;
+      default:
+        return false;
+      }
+    case RBW:
+    case RWR:
+      return (!storedBlock.isComplete());
+    case RUR:       // should not be reported                                                                                             
+    case TEMPORARY: // should not be reported                                                                                             
+    default:
+      return false;
+    }
+  }
+  
+  void addStoredBlockUnderConstruction(
+      BlockInfoUnderConstruction block, 
+      DatanodeDescriptor node, 
+      ReplicaState reportedState) 
+  throws IOException {
+    block.addReplicaIfNotPresent(node, block, reportedState);
+    if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
+      addStoredBlock(block, node, null, true);
+    }
+  }
+  
+  /**
+   * Faster version of {@link addStoredBlock()}, intended for use with 
+   * initial block report at startup.  If not in startup safe mode, will
+   * call standard addStoredBlock().
+   * Assumes this method is called "immediately" so there is no need to
+   * refresh the storedBlock from blocksMap.
+   * Doesn't handle underReplication/overReplication, or worry about
+   * pendingReplications or corruptReplicas, because it's in startup safe mode.
+   * Doesn't log every block, because there are typically millions of them.
+   * @throws IOException
+   */
+  private void addStoredBlockImmediate(BlockInfo storedBlock,
+                               DatanodeDescriptor node)
+  throws IOException {
+    assert (storedBlock != null && namesystem.hasWriteLock());
+    if (!namesystem.isInStartupSafeMode() 
+        || namesystem.isPopulatingReplQueues()) {
+      addStoredBlock(storedBlock, node, null, false);
+      return;
+    }
+
+    // just add it
+    node.addBlock(storedBlock);
+
+    // Now check for completion of blocks and safe block count
+    int numCurrentReplica = countLiveNodes(storedBlock);
+    if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
+        && numCurrentReplica >= minReplication)
+      storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
+
+    // check whether safe replication is reached for the block
+    // only complete blocks are counted towards that
+    if(storedBlock.isComplete())
+      namesystem.incrementSafeBlockCount(numCurrentReplica);
+  }
+
+  /**
    * Modify (block-->datanode) map. Remove block from set of
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  private Block addStoredBlock(final Block block,
+  private Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
-                               DatanodeDescriptor delNodeHint)
+                               DatanodeDescriptor delNodeHint,
+                               boolean logEveryBlock)
   throws IOException {
-    assert (namesystem.hasWriteLock());
-    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    assert (block != null && namesystem.hasWriteLock());
+    BlockInfo storedBlock;
+    if (block.getClass() == BlockInfoUnderConstruction.class) {
+      //refresh our copy in case the block got completed in another thread
+      storedBlock = blocksMap.getStoredBlock(block);
+    } else {
+      storedBlock = block;
+    }
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
@@ -1113,29 +1415,25 @@ public class BlockManager {
     INodeFile fileINode = storedBlock.getINode();
     assert fileINode != null : "Block must belong to a file";
 
-    // add block to the data-node
+    // add block to the datanode
     boolean added = node.addBlock(storedBlock);
 
-    int curReplicaDelta = 0;
+    int curReplicaDelta;
     if (added) {
       curReplicaDelta = 1;
-      //
-      // At startup time, because too many new blocks come in
-      // they take up lots of space in the log file.
-      // So, we log only when namenode is out of safemode.
-      //
-      if (!namesystem.isInSafeMode()) {
+      if (logEveryBlock) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
             + "blockMap updated: " + node.getName() + " is added to " + 
             storedBlock + " size " + storedBlock.getNumBytes());
       }
     } else {
+      curReplicaDelta = 0;
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node.getName() + " size " + storedBlock.getNumBytes());
     }
 
-    // filter out containingNodes that are marked for decommission.
+    // Now check for completion of blocks and safe block count
     NumberReplicas num = countNodes(storedBlock);
     int numLiveReplicas = num.liveReplicas();
     int numCurrentReplica = numLiveReplicas
@@ -1147,18 +1445,19 @@ public class BlockManager {
 
     // check whether safe replication is reached for the block
     // only complete blocks are counted towards that
+    // Is no-op if not in safe mode.
     if(storedBlock.isComplete())
       namesystem.incrementSafeBlockCount(numCurrentReplica);
 
-    // if file is under construction, then check whether the block
-    // can be completed
+    // if file is under construction, then done for now
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
     }
 
-    // do not handle mis-replicated blocks during start up
-    if (!namesystem.isPopulatingReplQueues())
+    // do not try to handle over/under-replicated blocks during safe mode
+    if (!namesystem.isPopulatingReplQueues()) {
       return storedBlock;
+    }
 
     // handle underReplication/overReplication
     short fileReplication = fileINode.getReplication();
@@ -1393,18 +1692,22 @@ public class BlockManager {
     pendingReplications.remove(block);
 
     // blockReceived reports a finalized block
-    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
-    node.processReportedBlock(this, block, ReplicaState.FINALIZED,
-                              toAdd, toInvalidate, toCorrupt);
-    // the block is only in one of the lists
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    processReportedBlock(node, block, ReplicaState.FINALIZED,
+                              toAdd, toInvalidate, toCorrupt, toUC);
+    // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
-    assert toAdd.size() + toInvalidate.size() <= 1 :
-      "The block should be only in one of the lists.";
+    assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
+      : "The block should be only in one of the lists.";
 
-    for (Block b : toAdd) {
-      addStoredBlock(b, node, delHintNode);
+    for (StatefulBlockInfo b : toUC) { 
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, delHintNode, true);
     }
     for (Block b : toInvalidate) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
@@ -1446,6 +1749,32 @@ public class BlockManager {
     return new NumberReplicas(live, count, corrupt, excess);
   }
 
+  /** 
+   * Simpler, faster form of {@link countNodes()} that only returns the number
+   * of live nodes.  If in startup safemode (or its 30-sec extension period),
+   * then it gains speed by ignoring issues of excess replicas or nodes
+   * that are decommissioned or in process of becoming decommissioned.
+   * If not in startup, then it calls {@link countNodes()} instead.
+   * 
+   * @param b - the block being tested
+   * @return count of live nodes for this block
+   */
+  int countLiveNodes(BlockInfo b) {
+    if (!namesystem.isInStartupSafeMode()) {
+      return countNodes(b).liveReplicas();
+    }
+    // else proceed with fast case
+    int live = 0;
+    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
+    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
+    while (nodeIter.hasNext()) {
+      DatanodeDescriptor node = nodeIter.next();
+      if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
+        live++;
+    }
+    return live;
+  }
+
   private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
       NumberReplicas num) {
     int curReplicas = num.liveReplicas();
@@ -1781,7 +2110,7 @@ public class BlockManager {
   /**
    * Return an iterator over the set of blocks for which there are no replicas.
    */
-  BlockIterator getCorruptReplicaBlockIterator() {
+  UnderReplicatedBlocks.BlockIterator getCorruptReplicaBlockIterator() {
     return neededReplications
         .iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Apr 29 03:03:25 2011
@@ -24,11 +24,8 @@ import java.util.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -278,7 +275,7 @@ public class DatanodeDescriptor extends 
   /**
    * Iterates over the list of blocks belonging to the datanode.
    */
-  static private class BlockIterator implements Iterator<BlockInfo> {
+  static class BlockIterator implements Iterator<BlockInfo> {
     private BlockInfo current;
     private DatanodeDescriptor node;
       
@@ -421,141 +418,6 @@ public class DatanodeDescriptor extends 
     return blockarray;
   }
 
-  void reportDiff(BlockManager blockManager,
-                  BlockListAsLongs newReport,
-                  Collection<Block> toAdd,    // add to DatanodeDescriptor
-                  Collection<Block> toRemove, // remove from DatanodeDescriptor
-                  Collection<Block> toInvalidate, // should be removed from DN
-                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfo(new Block(), 1);
-    boolean added = this.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
-    if(newReport == null)
-      newReport = new BlockListAsLongs();
-    // scan the report and process newly reported blocks
-    BlockReportIterator itBR = newReport.getBlockReportIterator();
-    while(itBR.hasNext()) {
-      Block iblk = itBR.next();
-      ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
-                                               toAdd, toInvalidate, toCorrupt);
-      // move block to the head of the list
-      if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
-        this.moveBlockToHead(storedBlock);
-    }
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<? extends Block> it = new BlockIterator(delimiter.getNext(0),this);
-    while(it.hasNext())
-      toRemove.add(it.next());
-    this.removeBlock(delimiter);
-  }
-
-  /**
-   * Process a block replica reported by the data-node.
-   * 
-   * <ol>
-   * <li>If the block is not known to the system (not in blocksMap) then the
-   * data-node should be notified to invalidate this block.</li>
-   * <li>If the reported replica is valid that is has the same generation stamp
-   * and length as recorded on the name-node, then the replica location is
-   * added to the name-node.</li>
-   * <li>If the reported replica is not valid, then it is marked as corrupt,
-   * which triggers replication of the existing valid replicas.
-   * Corrupt replicas are removed from the system when the block
-   * is fully replicated.</li>
-   * </ol>
-   * 
-   * @param blockManager
-   * @param block reported block replica
-   * @param rState reported replica state
-   * @param toAdd add to DatanodeDescriptor
-   * @param toInvalidate missing blocks (not in the blocks map)
-   *        should be removed from the data-node
-   * @param toCorrupt replicas with unexpected length or generation stamp;
-   *        add to corrupt replicas
-   * @return
-   */
-  BlockInfo processReportedBlock(
-                  BlockManager blockManager,
-                  Block block,                // reported block replica
-                  ReplicaState rState,        // reported replica state
-                  Collection<Block> toAdd,    // add to DatanodeDescriptor
-                  Collection<Block> toInvalidate, // should be removed from DN
-                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("Reported block " + block
-          + " on " + getName() + " size " + block.getNumBytes()
-          + " replicaState = " + rState);
-    }
-
-    // find block by blockId
-    BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
-    if(storedBlock == null) {
-      // If blocksMap does not contain reported block id,
-      // the replica should be removed from the data-node.
-      toInvalidate.add(new Block(block));
-      return null;
-    }
-
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("In memory blockUCState = " +
-          storedBlock.getBlockUCState());
-    }
-
-    // Ignore replicas already scheduled to be removed from the DN
-    if(blockManager.belongsToInvalidates(getStorageID(), block)) {
-      assert storedBlock.findDatanode(this) < 0 : "Block " + block 
-        + " in recentInvalidatesSet should not appear in DN " + this;
-      return storedBlock;
-    }
-
-    // Block is on the DN
-    boolean isCorrupt = false;
-    switch(rState) {
-    case FINALIZED:
-      switch(storedBlock.getBlockUCState()) {
-      case COMPLETE:
-      case COMMITTED:
-        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
-            || storedBlock.getNumBytes() != block.getNumBytes())
-          isCorrupt = true;
-        break;
-      case UNDER_CONSTRUCTION:
-      case UNDER_RECOVERY:
-        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            this, block, rState);
-      }
-      if(!isCorrupt && storedBlock.findDatanode(this) < 0)
-        if (storedBlock.getNumBytes() != block.getNumBytes()) {
-          toAdd.add(new Block(block));
-        } else {
-          toAdd.add(storedBlock);
-        }
-      break;
-    case RBW:
-    case RWR:
-      if(!storedBlock.isComplete())
-        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-                                                      this, block, rState);
-      else
-        isCorrupt = true;
-      break;
-    case RUR:       // should not be reported
-    case TEMPORARY: // should not be reported
-    default:
-      FSNamesystem.LOG.warn("Unexpected replica state " + rState
-          + " for block: " + storedBlock + 
-          " on " + getName() + " size " + storedBlock.getNumBytes());
-      break;
-    }
-    if(isCorrupt)
-        toCorrupt.add(storedBlock);
-    return storedBlock;
-  }
-
   /** Serialization for FSEditLog */
   void readFieldsFromFSEditLog(DataInput in) throws IOException {
     this.name = DeprecatedUTF8.readString(in);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Apr 29 03:03:25 2011
@@ -43,7 +43,14 @@ class EditLogFileOutputStream extends Ed
   private DataOutputBuffer bufCurrent; // current buffer for writing
   private DataOutputBuffer bufReady; // buffer ready for flushing
   final private int initBufferSize; // inital buffer size
-  static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation
+  static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
+
+  static {
+    fill.position(0);
+    for (int i = 0; i < fill.capacity(); i++) {
+      fill.put(FSEditLogOpCodes.OP_INVALID.getOpCode());
+    }
+  }
 
   /**
    * Creates output buffers and file object.
@@ -190,12 +197,11 @@ class EditLogFileOutputStream extends Ed
         FSNamesystem.LOG.debug("Preallocating Edit log, current size "
             + fc.size());
       }
-      long newsize = position + 1024 * 1024; // 1MB
       fill.position(0);
-      int written = fc.write(fill, newsize);
+      int written = fc.write(fill, position);
       if(FSNamesystem.LOG.isDebugEnabled()) {
         FSNamesystem.LOG.debug("Edit log size is now " + fc.size() +
-            " written " + written + " bytes " + " at offset " + newsize);
+            " written " + written + " bytes " + " at offset " + position);
       }
     }
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Apr 29 03:03:25 2011
@@ -42,6 +42,8 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSLimitException;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.*;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -68,6 +70,8 @@ class FSDirectory implements Closeable {
   FSImage fsImage;  
   private volatile boolean ready = false;
   private static final long UNKNOWN_DISK_SPACE = -1;
+  private final int maxComponentLength;
+  private final int maxDirItems;
   private final int lsLimit;  // max list limit
 
   // lock to protect BlockMap.
@@ -119,6 +123,14 @@ class FSDirectory implements Closeable {
     this.lsLimit = configuredLimit>0 ?
         configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
     
+    // filesystem limits
+    this.maxComponentLength = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
+    this.maxDirItems = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+
     int threshold = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
         DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
@@ -158,7 +170,7 @@ class FSDirectory implements Closeable {
     }
     writeLock();
     try {
-      this.ready = true;
+      setReady(true);
       this.nameCache.initialized();
       cond.signalAll();
     } finally {
@@ -166,6 +178,11 @@ class FSDirectory implements Closeable {
     }
   }
 
+  // exposed for unit tests
+  protected void setReady(boolean flag) {
+    ready = flag;
+  }
+
   private void incrDeletedFileCount(int count) {
     if (getFSNamesystem() != null)
       NameNode.getNameNodeMetrics().numFilesDeleted.inc(count);
@@ -1613,12 +1630,58 @@ class FSDirectory implements Closeable {
         commonAncestor);
   }
   
+  /**
+   * Verify that filesystem limit constraints are not violated
+   * @throws PathComponentTooLongException child's name is too long
+   * @throws MaxDirectoryItemsExceededException items per directory is exceeded
+   */
+  protected <T extends INode> void verifyFsLimits(INode[] pathComponents,
+      int pos, T child) throws FSLimitException {
+    boolean includeChildName = false;
+    try {
+      if (maxComponentLength != 0) {
+        int length = child.getLocalName().length();
+        if (length > maxComponentLength) {
+          includeChildName = true;
+          throw new PathComponentTooLongException(maxComponentLength, length);
+        }
+      }
+      if (maxDirItems != 0) {
+        INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
+        int count = parent.getChildren().size();
+        if (count >= maxDirItems) {
+          throw new MaxDirectoryItemsExceededException(maxDirItems, count);
+        }
+      }
+    } catch (FSLimitException e) {
+      String badPath = getFullPathName(pathComponents, pos-1);
+      if (includeChildName) {
+        badPath += Path.SEPARATOR + child.getLocalName();
+      }
+      e.setPathName(badPath);
+      // Do not throw if edits log is still being processed
+      if (ready) throw(e);
+      // log pre-existing paths that exceed limits
+      NameNode.LOG.error("FSDirectory.verifyFsLimits - " + e.getLocalizedMessage());
+    }
+  }
+  
   /** Add a node child to the inodes at index pos. 
    * Its ancestors are stored at [0, pos-1]. 
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addChild(INode[] pathComponents, int pos,
       T child, long childDiskspace, boolean inheritPermission,
       boolean checkQuota) throws QuotaExceededException {
+	// The filesystem limits are not really quotas, so this check may appear
+	// odd.  It's because a rename operation deletes the src, tries to add
+	// to the dest, if that fails, re-adds the src from whence it came.
+	// The rename code disables the quota when it's restoring to the
+	// original location becase a quota violation would cause the the item
+	// to go "poof".  The fs limits must be bypassed for the same reason.
+    if (checkQuota) {
+      verifyFsLimits(pathComponents, pos, child);
+    }
+    
     INode.DirCounts counts = new INode.DirCounts();
     child.spaceConsumedInTree(counts);
     if (childDiskspace < 0) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Apr 29 03:03:25 2011
@@ -527,6 +527,8 @@ public class FSEditLogLoader {
         }
         validateChecksum(in, checksum, numEdits);
       }
+    } catch (IOException ex) {
+      check203UpgradeFailure(logVersion, ex);
     } finally {
       if(closeOnExit)
         in.close();
@@ -670,4 +672,25 @@ public class FSEditLogLoader {
     }
     return blocks;
   }
+  
+  /** 
+   * Throw appropriate exception during upgrade from 203, when editlog loading
+   * could fail due to opcode conflicts.
+   */
+  private void check203UpgradeFailure(int logVersion, IOException ex)
+      throws IOException {
+    // 0.20.203 version version has conflicting opcodes with the later releases.
+    // The editlog must be emptied by restarting the namenode, before proceeding
+    // with the upgrade.
+    if (Storage.is203LayoutVersion(logVersion)
+        && logVersion != FSConstants.LAYOUT_VERSION) {
+      String msg = "During upgrade failed to load the editlog version "
+          + logVersion + " from release 0.20.203. Please go back to the old "
+          + " release and restart the namenode. This empties the editlog "
+          + " and saves the namespace. Resume the upgrade after this step.";
+      throw new IOException(msg, ex);
+    } else {
+      throw ex;
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Apr 29 03:03:25 2011
@@ -71,8 +71,6 @@ public class FSImage implements NNStorag
   private static final SimpleDateFormat DATE_FORM =
       new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  private static final int FIRST_TXNID_BASED_LAYOUT_VERSION=-32;
-  
   // checkpoint states
   enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
 
@@ -520,9 +518,9 @@ public class FSImage implements NNStorag
     // (ie edits_<txnid>) then use the new inspector, which will ignore
     // the old format dirs.
     FSImageStorageInspector inspector;
-    if (minLayoutVersion <= FIRST_TXNID_BASED_LAYOUT_VERSION) {
+    if (minLayoutVersion <= FSConstants.FIRST_TXNID_BASED_LAYOUT_VERSION) {
       inspector = new FSImageTransactionalStorageInspector();
-      if (maxLayoutVersion > FIRST_TXNID_BASED_LAYOUT_VERSION) {
+      if (maxLayoutVersion > FSConstants.FIRST_TXNID_BASED_LAYOUT_VERSION) {
         LOG.warn("Ignoring one or more storage directories with old layouts");
       }
     } else {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Apr 29 03:03:25 2011
@@ -238,7 +238,9 @@ public class FSNamesystem implements FSC
   public Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
   public Daemon replthread = null;  // Replication thread
-  
+  Daemon nnrmthread = null; // NamenodeResourceMonitor thread
+
+  private volatile boolean hasResourcesAvailable = false;
   private volatile boolean fsRunning = true;
   long systemStart = 0;
 
@@ -249,6 +251,13 @@ public class FSNamesystem implements FSC
   private long heartbeatExpireInterval;
   //replicationRecheckInterval is how often namenode checks for new replication work
   private long replicationRecheckInterval;
+
+  //resourceRecheckInterval is how often namenode checks for the disk space availability
+  private long resourceRecheckInterval;
+
+  // The actual resource checker instance.
+  NameNodeResourceChecker nnResourceChecker;
+
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
@@ -299,6 +308,11 @@ public class FSNamesystem implements FSC
    */
   private void initialize(Configuration conf, FSImage fsImage)
       throws IOException {
+    resourceRecheckInterval =
+        conf.getLong(DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
+    nnResourceChecker = new NameNodeResourceChecker(conf);
+    checkAvailableResources();
     this.systemStart = now();
     this.blockManager = new BlockManager(this, conf);
     this.fsLock = new ReentrantReadWriteLock(true); // fair locking
@@ -346,6 +360,9 @@ public class FSNamesystem implements FSC
     lmthread.start();
     replthread.start();
 
+    this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
+    nnrmthread.start();
+
     this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
                     DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
@@ -358,7 +375,7 @@ public class FSNamesystem implements FSC
                       ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
     
-    /* If the dns to swith mapping supports cache, resolve network 
+    /* If the dns to switch mapping supports cache, resolve network
      * locations of those hosts in the include list, 
      * and store the mapping in the cache; so future calls to resolve
      * will be fast.
@@ -556,6 +573,7 @@ public class FSNamesystem implements FSC
       if (dnthread != null) dnthread.interrupt();
       if (smmthread != null) smmthread.interrupt();
       if (dtSecretManager != null) dtSecretManager.stopThreads();
+      if (nnrmthread != null) nnrmthread.interrupt();
     } catch (Exception e) {
       LOG.warn("Exception shutting down FSNamesystem", e);
     } finally {
@@ -2839,6 +2857,57 @@ public class FSNamesystem implements FSC
   }
 
   /**
+   * Returns whether or not there were available resources at the last check of
+   * resources.
+   *
+   * @return true if there were sufficient resources available, false otherwise.
+   */
+  private boolean nameNodeHasResourcesAvailable() {
+    return hasResourcesAvailable;
+  }
+
+  /**
+   * Perform resource checks and cache the results.
+   * @throws IOException
+   */
+  private void checkAvailableResources() throws IOException {
+    hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
+  }
+
+  /**
+   * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
+   * there are found to be insufficient resources available, causes the NN to
+   * enter safe mode. If resources are later found to have returned to
+   * acceptable levels, this daemon will cause the NN to exit safe mode.
+   */
+  class NameNodeResourceMonitor implements Runnable  {
+    @Override
+    public void run () {
+      try {
+        while (fsRunning) {
+          checkAvailableResources();
+          if(!nameNodeHasResourcesAvailable()) {
+            String lowResourcesMsg = "NameNode low on available disk space. ";
+            if (!isInSafeMode()) {
+              FSNamesystem.LOG.warn(lowResourcesMsg + "Entering safe mode.");
+            } else {
+              FSNamesystem.LOG.warn(lowResourcesMsg + "Already in safe mode.");
+            }
+            enterSafeMode(true);
+          }
+          try {
+            Thread.sleep(resourceRecheckInterval);
+          } catch (InterruptedException ie) {
+            // Deliberately ignore
+          }
+        }
+      } catch (Exception e) {
+        FSNamesystem.LOG.error("Exception in NameNodeResourceMonitor: ", e);
+      }
+    }
+  }
+
+  /**
    * Update access keys.
    */
   void updateBlockKey() throws IOException {
@@ -3128,20 +3197,15 @@ public class FSNamesystem implements FSC
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
   public void processReport(DatanodeID nodeID, 
-                                         BlockListAsLongs newReport
-                                        ) throws IOException {
+      BlockListAsLongs newReport) throws IOException {
+    long startTime, endTime;
 
     writeLock();
+    startTime = now(); //after acquiring write lock
     try {
-    long startTime = now();
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                             + "from " + nodeID.getName()+" " + 
-                             newReport.getNumberOfBlocks()+" blocks");
-    }
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null || !node.isAlive) {
-      throw new IOException("ProcessReport from dead or unregisterted node: "
+      throw new IOException("ProcessReport from dead or unregistered node: "
                             + nodeID.getName());
     }
     // To minimize startup time, we discard any second (or later) block reports
@@ -3154,10 +3218,16 @@ public class FSNamesystem implements FSC
     }
 
     blockManager.processReport(node, newReport);
-    NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
     } finally {
+      endTime = now();
       writeUnlock();
     }
+
+    // Log the block report processing stats from Namenode perspective
+    NameNode.getNameNodeMetrics().blockReport.inc((int) (endTime - startTime));
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
+        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
   }
 
   /**
@@ -3825,6 +3895,8 @@ public class FSNamesystem implements FSC
     private long lastStatusReport = 0;
     /** flag indicating whether replication queues have been initialized */
     private boolean initializedReplQueues = false;
+    /** Was safemode entered automatically because available resources were low. */
+    private boolean resourcesLow = false;
     
     /**
      * Creates SafeModeInfo when the name node enters
@@ -3849,14 +3921,15 @@ public class FSNamesystem implements FSC
     }
 
     /**
-     * Creates SafeModeInfo when safe mode is entered manually.
+     * Creates SafeModeInfo when safe mode is entered manually, or because
+     * available resources are low.
      *
      * The {@link #threshold} is set to 1.5 so that it could never be reached.
      * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
      * 
      * @see SafeModeInfo
      */
-    private SafeModeInfo() {
+    private SafeModeInfo(boolean resourcesLow) {
       this.threshold = 1.5f;  // this threshold can never be reached
       this.datanodeThreshold = Integer.MAX_VALUE;
       this.extension = Integer.MAX_VALUE;
@@ -3865,6 +3938,7 @@ public class FSNamesystem implements FSC
       this.blockTotal = -1;
       this.blockSafe = -1;
       this.reached = -1;
+      this.resourcesLow = resourcesLow;
       enter();
       reportStatus("STATE* Safe mode is ON.", true);
     }
@@ -3915,7 +3989,7 @@ public class FSNamesystem implements FSC
         }
         if(needUpgrade) {
           // switch to manual safe mode
-          safeMode = new SafeModeInfo();
+          safeMode = new SafeModeInfo(false);
           return;
         }
       }
@@ -3948,8 +4022,13 @@ public class FSNamesystem implements FSC
       if (isPopulatingReplQueues()) {
         LOG.warn("Replication queues already initialized.");
       }
+      long startTimeMisReplicatedScan = now();
       blockManager.processMisReplicatedBlocks();
       initializedReplQueues = true;
+      NameNode.stateChangeLog.info("STATE* Replication Queue initialization "
+          + "scan for invalid, over- and under-replicated blocks "
+          + "completed in " + (now() - startTimeMisReplicatedScan)
+          + " msec");
     }
 
     /**
@@ -3982,7 +4061,8 @@ public class FSNamesystem implements FSC
      */
     boolean needEnter() {
       return (threshold != 0 && blockSafe < blockThreshold) ||
-        (getNumLiveDataNodes() < datanodeThreshold);
+        (getNumLiveDataNodes() < datanodeThreshold) ||
+        (!nameNodeHasResourcesAvailable());
     }
       
     /**
@@ -4054,10 +4134,11 @@ public class FSNamesystem implements FSC
     }
 
     /**
-     * Check if safe mode was entered manually or at startup.
+     * Check if safe mode was entered manually or automatically (at startup, or
+     * when disk space is low).
      */
     boolean isManual() {
-      return extension == Integer.MAX_VALUE;
+      return extension == Integer.MAX_VALUE && !resourcesLow;
     }
 
     /**
@@ -4068,12 +4149,31 @@ public class FSNamesystem implements FSC
     }
 
     /**
+     * Check if safe mode was entered due to resources being low.
+     */
+    boolean areResourcesLow() {
+      return resourcesLow;
+    }
+
+    /**
+     * Set that resources are low for this instance of safe mode.
+     */
+    void setResourcesLow() {
+      resourcesLow = true;
+    }
+
+    /**
      * A tip on how safe mode is to be turned off: manually or automatically.
      */
     String getTurnOffTip() {
       if(reached < 0)
         return "Safe mode is OFF.";
-      String leaveMsg = "Safe mode will be turned off automatically";
+      String leaveMsg = "";
+      if (areResourcesLow()) {
+        leaveMsg = "Resources are low on NN. Safe mode must be turned off manually";
+      } else {
+        leaveMsg = "Safe mode will be turned off automatically";
+      }
       if(isManual()) {
         if(getDistributedUpgradeState())
           return leaveMsg + " upon completion of " + 
@@ -4081,6 +4181,7 @@ public class FSNamesystem implements FSC
             getDistributedUpgradeStatus() + "%";
         leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
       }
+
       if(blockTotal < 0)
         return leaveMsg + ".";
 
@@ -4203,7 +4304,7 @@ public class FSNamesystem implements FSC
         leaveSafeMode(false);
         break;
       case SAFEMODE_ENTER: // enter safe mode
-        enterSafeMode();
+        enterSafeMode(false);
         break;
       }
     }
@@ -4308,7 +4409,7 @@ public class FSNamesystem implements FSC
    * Enter safe mode manually.
    * @throws IOException
    */
-  void enterSafeMode() throws IOException {
+  void enterSafeMode(boolean resourcesLow) throws IOException {
     writeLock();
     try {
     // Ensure that any concurrent operations have been fully synced
@@ -4316,9 +4417,12 @@ public class FSNamesystem implements FSC
     // is entirely stable on disk as soon as we're in safe mode.
     getEditLog().logSyncAll();
     if (!isInSafeMode()) {
-      safeMode = new SafeModeInfo();
+      safeMode = new SafeModeInfo(resourcesLow);
       return;
     }
+    if (resourcesLow) {
+      safeMode.setResourcesLow();
+    }
     safeMode.setManual();
     NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
                                 + safeMode.getTurnOffTip());