You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 09:10:53 UTC

svn commit: r1152128 [1/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/...

Author: todd
Date: Fri Jul 29 07:10:48 2011
New Revision: 1152128

URL: http://svn.apache.org/viewvc?rev=1152128&view=rev
Log:
Merge trunk into HDFS-1073.

Resolved several conflicts due to merge of HDFS-2149 and HDFS-2212.
Changes during resolution were:
- move the writing of the transaction ID out of EditLogOutputStream to
  FSEditLogOp.Writer to match trunk's organization
- remove JSPOOL related FsEditLogOp subclasses, add LogSegmentOp subclasses
- modify TestEditLogJournalFailures to not keep trying to use streams after
  the simulated halt, since newer stricter assertions caused these writes to
  fail

Added:
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
      - copied, changed from r1151750, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
      - copied unchanged from r1151750, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java
      - copied unchanged from r1151750, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java
Removed:
    hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/
    hadoop/common/branches/HDFS-1073/hdfs/src/docs/src/documentation/content/xdocs/hdfsproxy.xml
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
Modified:
    hadoop/common/branches/HDFS-1073/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-1073/hdfs/build.xml
    hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/contrib/build.xml
    hadoop/common/branches/HDFS-1073/hdfs/src/docs/src/documentation/content/xdocs/site.xml
    hadoop/common/branches/HDFS-1073/hdfs/src/java/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeDeath.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreationClient.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreationDelete.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreationEmpty.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1073/hdfs/src/webapps/secondary/   (props changed)

Propchange: hadoop/common/branches/HDFS-1073/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 07:10:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs:1134994-1150966
+/hadoop/common/trunk/hdfs:1134994-1151750
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463

Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt Fri Jul 29 07:10:48 2011
@@ -9,6 +9,8 @@ Trunk (unreleased changes)
 
     HDFS-1536. Improve HDFS WebUI. (hairong)
 
+    HDFS-2210. Remove hdfsproxy. (eli)
+
   NEW FEATURES
 
     HDFS-1359. Add BlockPoolID to Block. (suresh)
@@ -600,6 +602,22 @@ Trunk (unreleased changes)
     cause. (Ravi Prakash via atm)
 
     HDFS-2180. Refactor NameNode HTTP server into new class. (todd)
+    
+    HDFS-2198. Remove hardcoded configuration keys. (suresh)
+
+    HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
+    implementations. (Ivan Kelly via todd)
+
+    HDFS-2191. Move datanodeMap from FSNamesystem to DatanodeManager.
+    (szetszwo)
+
+    HDFS-2200. Change FSNamesystem.LOG to package private. (szetszwo)
+
+    HDFS-2195. Refactor StorageDirectory to not be an non-static inner class.
+    (todd via eli)
+
+    HDFS-2212. Refactor double-buffering code out of EditLogOutputStreams.
+    (todd via eli)
 
   OPTIMIZATIONS
 
@@ -1370,6 +1388,9 @@ Release 0.22.0 - Unreleased
     HDFS-2071. Use of isConnected() in DataXceiver is invalid. (Kihwal Lee
     via todd)
 
+    HDFS-1981. NameNode does not saveNamespace() when editsNew is empty.
+    (Uma Maheswara Rao G via shv)
+
 Release 0.21.1 - Unreleased
     HDFS-1466. TestFcHdfsSymlink relies on /tmp/test not existing. (eli)
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/build.xml?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/build.xml (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/build.xml Fri Jul 29 07:10:48 2011
@@ -1397,7 +1397,6 @@
         <exclude name="src/c++/libhdfs/install-sh" />
         <exclude name="src/c++/libhdfs/ltmain.sh" />
         <exclude name="src/c++/libhdfs/missing" />
-        <exclude name="src/contrib/hdfsproxy/src/test/resources/" />
         <exclude name="src/test/checkstyle-noframes-sorted.xsl" />
         <exclude name="src/test/checkstyle.xml" />
         <exclude name="src/test/findbugsExcludeFile.xml" />

Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 07:10:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1150966
+/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1151750
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/contrib/build.xml?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/contrib/build.xml (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/contrib/build.xml Fri Jul 29 07:10:48 2011
@@ -48,12 +48,6 @@
     <subant target="test">
       <fileset dir="." includes="fuse-dfs/build.xml"/>
     </subant> 
-
-    <!-- hdfsproxy tests failing due to HDFS-1666
-    <subant target="test">
-      <fileset dir="." includes="hdfsproxy/build.xml"/>
-    </subant>
-      -->
   </target>
   
   

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/docs/src/documentation/content/xdocs/site.xml?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/docs/src/documentation/content/xdocs/site.xml Fri Jul 29 07:10:48 2011
@@ -43,7 +43,6 @@ See http://forrest.apache.org/docs/linki
       <hdfs_SLG        		label="Synthetic Load Generator"  href="SLG_user_guide.html" />
       <hdfs_imageviewer	label="Offline Image Viewer"	href="hdfs_imageviewer.html" />
       <hdfs_editsviewer	label="Offline Edits Viewer"	href="hdfs_editsviewer.html" />
-      <hdfsproxy 			    label="HDFS Proxy" href="hdfsproxy.html"/>
       <hftp 			    label="HFTP" href="hftp.html"/>
       <faultinject_framework label="Fault Injection"  href="faultinject_framework.html" /> 
       <hdfs_libhdfs   		label="C API libhdfs" href="libhdfs.html" /> 

Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 07:10:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/java:1134994-1150966
+/hadoop/common/trunk/hdfs/src/java:1134994-1151750
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jul 29 07:10:48 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.ParentNotDir
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@@ -149,48 +150,38 @@ public class DFSClient implements FSCons
 
     Conf(Configuration conf) {
       maxBlockAcquireFailures = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
-          DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
-      confTime = conf.getInt(
-          DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+          DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
+          DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+      confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
           HdfsConstants.WRITE_TIMEOUT);
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-      bytesPerChecksum = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
-          DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
-      socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+      bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsConstants.READ_TIMEOUT);
       /** dfs.write.packet.size is an internal config variable */
-      writePacketSize = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
-          DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-      defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+      writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+          DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+      defaultBlockSize = conf.getLong(DFS_BLOCK_SIZE_KEY,
           DEFAULT_BLOCK_SIZE);
       defaultReplication = (short) conf.getInt(
-          DFSConfigKeys.DFS_REPLICATION_KEY,
-          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+          DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
-      socketCacheCapacity = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
-          DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
-      prefetchSize = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
+      socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+          DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
       timeWindow = conf
-          .getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
-      nCachedConnRetry = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
-          DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
-      nBlockWriteRetry = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
-          DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+          .getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
+      nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+          DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+      nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
+          DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
       nBlockWriteLocateFollowingRetry = conf
-          .getInt(
-              DFSConfigKeys
-              .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
-              DFSConfigKeys
-              .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+          .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+              DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
     }
   }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 29 07:10:48 2011
@@ -42,9 +42,11 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -53,6 +55,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -156,7 +160,7 @@ public class BlockManager {
   public final int defaultReplication;
   /** The maximum number of entries returned by getCorruptInodes() */
   final int maxCorruptFilesReturned;
-  
+
   /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
 
@@ -208,12 +212,12 @@ public class BlockManager {
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                   DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
-    FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
-    FSNamesystem.LOG.info("maxReplication = " + maxReplication);
-    FSNamesystem.LOG.info("minReplication = " + minReplication);
-    FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
-    FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
-    FSNamesystem.LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
+    LOG.info("defaultReplication = " + defaultReplication);
+    LOG.info("maxReplication     = " + maxReplication);
+    LOG.info("minReplication     = " + minReplication);
+    LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
+    LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
+    LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
   }
 
   public void activate(Configuration conf) {
@@ -294,15 +298,14 @@ public class BlockManager {
       }
     }
 
-    //
     // Dump blocks from pendingReplication
-    //
     pendingReplications.metaSave(out);
 
-    //
     // Dump blocks that are waiting to be deleted
-    //
     dumpRecentInvalidateSets(out);
+
+    // Dump all datanodes
+    getDatanodeManager().datanodeDump(out);
   }
 
   /**
@@ -341,9 +344,7 @@ public class BlockManager {
         namesystem.dir.updateSpaceConsumed(path, 0, -diff
             * fileINode.getReplication());
       } catch (IOException e) {
-        FSNamesystem.LOG
-            .warn("Unexpected exception while updating disk space : "
-                + e.getMessage());
+        LOG.warn("Unexpected exception while updating disk space.", e);
       }
     }
   }
@@ -453,7 +454,7 @@ public class BlockManager {
   /**
    * Get all valid locations of the block
    */
-  public ArrayList<String> getValidLocations(Block block) {
+  private List<String> getValidLocations(Block block) {
     ArrayList<String> machineSet =
       new ArrayList<String>(blocksMap.numNodes(block));
     for(Iterator<DatanodeDescriptor> it =
@@ -514,7 +515,7 @@ public class BlockManager {
     final int numCorruptNodes = countNodes(blk).corruptReplicas();
     final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
     if (numCorruptNodes != numCorruptReplicas) {
-      FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+      LOG.warn("Inconsistent number of corrupt replicas for "
           + blk + " blockMap has " + numCorruptNodes
           + " but corrupt replicas map has " + numCorruptReplicas);
     }
@@ -562,6 +563,49 @@ public class BlockManager {
                             minReplication);
   }
 
+   /** Get all blocks with location information from a datanode. */
+  public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
+      final long size) throws UnregisteredNodeException {
+    final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
+    if (node == null) {
+      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+          + "Asking for blocks from an unrecorded node " + datanode.getName());
+      throw new HadoopIllegalArgumentException(
+          "Datanode " + datanode.getName() + " not found.");
+    }
+
+    int numBlocks = node.numBlocks();
+    if(numBlocks == 0) {
+      return new BlocksWithLocations(new BlockWithLocations[0]);
+    }
+    Iterator<BlockInfo> iter = node.getBlockIterator();
+    int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
+    // skip blocks
+    for(int i=0; i<startBlock; i++) {
+      iter.next();
+    }
+    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+    long totalSize = 0;
+    BlockInfo curBlock;
+    while(totalSize<size && iter.hasNext()) {
+      curBlock = iter.next();
+      if(!curBlock.isComplete())  continue;
+      totalSize += addBlock(curBlock, results);
+    }
+    if(totalSize<size) {
+      iter = node.getBlockIterator(); // start from the beginning
+      for(int i=0; i<startBlock&&totalSize<size; i++) {
+        curBlock = iter.next();
+        if(!curBlock.isComplete())  continue;
+        totalSize += addBlock(curBlock, results);
+      }
+    }
+
+    return new BlocksWithLocations(
+        results.toArray(new BlockWithLocations[results.size()]));
+  }
+
+   
   /** Remove a datanode. */
   public void removeDatanode(final DatanodeDescriptor node) {
     final Iterator<? extends Block> it = node.getBlockIterator();
@@ -660,7 +704,7 @@ public class BlockManager {
     for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
       Collection<Block> blocks = entry.getValue();
       if (blocks.size() > 0) {
-        out.println(namesystem.getDatanode(entry.getKey()).getName() + blocks);
+        out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
       }
     }
   }
@@ -684,7 +728,7 @@ public class BlockManager {
   private void markBlockAsCorrupt(BlockInfo storedBlock,
                                   DatanodeInfo dn) throws IOException {
     assert storedBlock != null : "storedBlock should not be null";
-    DatanodeDescriptor node = namesystem.getDatanode(dn);
+    DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
       throw new IOException("Cannot mark block " + 
                             storedBlock.getBlockName() +
@@ -723,7 +767,7 @@ public class BlockManager {
       throws IOException {
     NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
                                  + blk + " on " + dn.getName());
-    DatanodeDescriptor node = namesystem.getDatanode(dn);
+    DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
       throw new IOException("Cannot invalidate block " + blk +
                             " because datanode " + dn.getName() +
@@ -748,7 +792,7 @@ public class BlockManager {
     }
   }
 
-  public void updateState() {
+  void updateState() {
     pendingReplicationBlocksCount = pendingReplications.size();
     underReplicatedBlocksCount = neededReplications.size();
     corruptReplicaBlocksCount = corruptReplicas.size();
@@ -869,7 +913,7 @@ public class BlockManager {
           Block block = neededReplicationsIterator.next();
           int priority = neededReplicationsIterator.getPriority();
           if (priority < 0 || priority >= blocksToReplicate.size()) {
-            FSNamesystem.LOG.warn("Unexpected replication priority: "
+            LOG.warn("Unexpected replication priority: "
                 + priority + " " + block);
           } else {
             blocksToReplicate.get(priority).add(block);
@@ -1134,7 +1178,7 @@ public class BlockManager {
    * If there were any replication requests that timed out, reap them
    * and put them back into the neededReplication queue
    */
-  public void processPendingReplications() {
+  private void processPendingReplications() {
     Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
       namesystem.writeLock();
@@ -1338,8 +1382,8 @@ public class BlockManager {
       Collection<BlockInfo> toCorrupt,
       Collection<StatefulBlockInfo> toUC) {
     
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("Reported block " + block
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Reported block " + block
           + " on " + dn.getName() + " size " + block.getNumBytes()
           + " replicaState = " + reportedState);
     }
@@ -1355,8 +1399,8 @@ public class BlockManager {
     BlockUCState ucState = storedBlock.getBlockUCState();
     
     // Block is on the NN
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("In memory blockUCState = " + ucState);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("In memory blockUCState = " + ucState);
     }
 
     // Ignore replicas already scheduled to be removed from the DN
@@ -1411,7 +1455,7 @@ public class BlockManager {
     case RUR:       // should not be reported
     case TEMPORARY: // should not be reported
     default:
-      FSNamesystem.LOG.warn("Unexpected replica state " + reportedState
+      LOG.warn("Unexpected replica state " + reportedState
           + " for block: " + storedBlock + 
           " on " + dn.getName() + " size " + storedBlock.getNumBytes());
       return true;
@@ -1579,7 +1623,7 @@ public class BlockManager {
     int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
     int numCorruptNodes = num.corruptReplicas();
     if (numCorruptNodes != corruptReplicasCount) {
-      FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " +
+      LOG.warn("Inconsistent number of corrupt replicas for " +
           storedBlock + "blockMap has " + numCorruptNodes + 
           " but corrupt replicas map has " + corruptReplicasCount);
     }
@@ -1662,10 +1706,10 @@ public class BlockManager {
     } finally {
       namesystem.writeUnlock();
     }
-    FSNamesystem.LOG.info("Total number of blocks = " + blocksMap.size());
-    FSNamesystem.LOG.info("Number of invalid blocks = " + nrInvalid);
-    FSNamesystem.LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
-    FSNamesystem.LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
+    LOG.info("Total number of blocks            = " + blocksMap.size());
+    LOG.info("Number of invalid blocks          = " + nrInvalid);
+    LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
+    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
   }
 
   /**
@@ -1700,6 +1744,7 @@ public class BlockManager {
         addedNode, delNodeHint, blockplacement);
   }
 
+
   public void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
     Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
@@ -1774,6 +1819,21 @@ public class BlockManager {
   }
 
   /**
+   * Get all valid locations of the block & add the block to results
+   * return the length of the added block; 0 if the block is not added
+   */
+  private long addBlock(Block block, List<BlockWithLocations> results) {
+    final List<String> machineSet = getValidLocations(block);
+    if(machineSet.size() == 0) {
+      return 0;
+    } else {
+      results.add(new BlockWithLocations(block, 
+          machineSet.toArray(new String[machineSet.size()])));
+      return block.getNumBytes();
+    }
+  }
+
+  /**
    * The given node is reporting that it received a certain block.
    */
   public void addBlock(DatanodeDescriptor node, Block block, String delHint)
@@ -1784,7 +1844,7 @@ public class BlockManager {
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;
     if (delHint != null && delHint.length() != 0) {
-      delHintNode = namesystem.getDatanode(delHint);
+      delHintNode = datanodeManager.getDatanode(delHint);
       if (delHintNode == null) {
         NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
             + block + " is expected to be removed from an unrecorded node "
@@ -1893,7 +1953,7 @@ public class BlockManager {
       nodeList.append(node.name);
       nodeList.append(" ");
     }
-    FSNamesystem.LOG.info("Block: " + block + ", Expected Replicas: "
+    LOG.info("Block: " + block + ", Expected Replicas: "
         + curExpectedReplicas + ", live replicas: " + curReplicas
         + ", corrupt replicas: " + num.corruptReplicas()
         + ", decommissioned replicas: " + num.decommissionedReplicas()
@@ -2071,7 +2131,7 @@ public class BlockManager {
         return 0;
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
-      DatanodeDescriptor dn = namesystem.getDatanode(nodeId);
+      final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
       if (dn == null) {
         removeFromInvalidates(nodeId);
         return 0;
@@ -2082,11 +2142,11 @@ public class BlockManager {
         return 0;
 
       ArrayList<Block> blocksToInvalidate = new ArrayList<Block>(
-          namesystem.blockInvalidateLimit);
+          getDatanodeManager().blockInvalidateLimit);
 
       // # blocks that can be sent in one message is limited
       Iterator<Block> it = invalidateSet.iterator();
-      for (int blkCount = 0; blkCount < namesystem.blockInvalidateLimit
+      for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
           && it.hasNext(); blkCount++) {
         blocksToInvalidate.add(it.next());
         it.remove();

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Jul 29 07:10:48 2011
@@ -22,6 +22,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -38,7 +40,8 @@ import org.apache.hadoop.util.Reflection
  */
 @InterfaceAudience.Private
 public abstract class BlockPlacementPolicy {
-    
+  static final Log LOG = LogFactory.getLog(BlockPlacementPolicy.class);
+
   @InterfaceAudience.Private
   public static class NotEnoughReplicasException extends Exception {
     private static final long serialVersionUID = 1L;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Jul 29 07:10:48 2011
@@ -212,7 +212,7 @@ public class BlockPlacementPolicyDefault
       chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
                    blocksize, maxNodesPerRack, results);
     } catch (NotEnoughReplicasException e) {
-      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+      LOG.warn("Not able to place enough replicas, still in need of "
                + numOfReplicas + " to reach " + totalReplicasExpected + "\n"
                + e.getMessage());
     }
@@ -343,7 +343,7 @@ public class BlockPlacementPolicyDefault
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
     StringBuilder builder = null;
-    if (FSNamesystem.LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       builder = threadLocalBuilder.get();
       builder.setLength(0);
       builder.append("[");
@@ -366,7 +366,7 @@ public class BlockPlacementPolicyDefault
     }
 
     String detail = enableDebugLogging;
-    if (FSNamesystem.LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       if (badTarget && builder != null) {
         detail = builder.append("]").toString();
         builder.setLength(0);
@@ -388,7 +388,7 @@ public class BlockPlacementPolicyDefault
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
     StringBuilder builder = null;
-    if (FSNamesystem.LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       builder = threadLocalBuilder.get();
       builder.setLength(0);
       builder.append("[");
@@ -412,7 +412,7 @@ public class BlockPlacementPolicyDefault
       
     if (numOfReplicas>0) {
       String detail = enableDebugLogging;
-      if (FSNamesystem.LOG.isDebugEnabled()) {
+      if (LOG.isDebugEnabled()) {
         if (badTarget && builder != null) {
           detail = builder.append("]").toString();
           builder.setLength(0);
@@ -439,7 +439,7 @@ public class BlockPlacementPolicyDefault
                                List<DatanodeDescriptor> results) {
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      if(FSNamesystem.LOG.isDebugEnabled()) {
+      if(LOG.isDebugEnabled()) {
         threadLocalBuilder.get().append(node.toString()).append(": ")
           .append("Node ").append(NodeBase.getPath(node))
           .append(" is not chosen because the node is (being) decommissioned ");
@@ -451,7 +451,7 @@ public class BlockPlacementPolicyDefault
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine
     if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      if(FSNamesystem.LOG.isDebugEnabled()) {
+      if(LOG.isDebugEnabled()) {
         threadLocalBuilder.get().append(node.toString()).append(": ")
           .append("Node ").append(NodeBase.getPath(node))
           .append(" is not chosen because the node does not have enough space ");
@@ -467,7 +467,7 @@ public class BlockPlacementPolicyDefault
         avgLoad = (double)stats.getTotalLoad()/size;
       }
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        if(FSNamesystem.LOG.isDebugEnabled()) {
+        if(LOG.isDebugEnabled()) {
           threadLocalBuilder.get().append(node.toString()).append(": ")
             .append("Node ").append(NodeBase.getPath(node))
             .append(" is not chosen because the node is too busy ");
@@ -487,7 +487,7 @@ public class BlockPlacementPolicyDefault
       }
     }
     if (counter>maxTargetPerLoc) {
-      if(FSNamesystem.LOG.isDebugEnabled()) {
+      if(LOG.isDebugEnabled()) {
         threadLocalBuilder.get().append(node.toString()).append(": ")
           .append("Node ").append(NodeBase.getPath(node))
           .append(" is not chosen because the rack has too many chosen nodes ");

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Jul 29 07:10:48 2011
@@ -19,16 +19,22 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.WritableUtils;
 
 /**************************************************
@@ -326,7 +332,7 @@ public class DatanodeDescriptor extends 
   void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
     if(recoverBlocks.contains(block)) {
       // this prevents adding the same block twice to the recovery queue
-      FSNamesystem.LOG.info("Block " + block +
+      BlockManager.LOG.info("Block " + block +
                             " is already in the recovery queue.");
       return;
     }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 29 07:10:48 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NavigableMap;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,12 +38,22 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.ipc.Server;
@@ -48,6 +61,7 @@ import org.apache.hadoop.net.CachedDNSTo
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.CyclicIteration;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -62,6 +76,30 @@ public class DatanodeManager {
 
   final FSNamesystem namesystem;
 
+  /**
+   * Stores the datanode -> block map.  
+   * <p>
+   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
+   * storage id. In order to keep the storage map consistent it tracks 
+   * all storages ever registered with the namenode.
+   * A descriptor corresponding to a specific storage id can be
+   * <ul> 
+   * <li>added to the map if it is a new storage id;</li>
+   * <li>updated with a new datanode started as a replacement for the old one 
+   * with the same storage id; and </li>
+   * <li>removed if and only if an existing datanode is restarted to serve a
+   * different storage id.</li>
+   * </ul> <br>
+   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+   * in the namespace image file. Only the {@link DatanodeInfo} part is 
+   * persistent, the list of blocks is restored from the datanode block
+   * reports. 
+   * <p>
+   * Mapping: StorageID -> DatanodeDescriptor
+   */
+  private final NavigableMap<String, DatanodeDescriptor> datanodeMap
+      = new TreeMap<String, DatanodeDescriptor>();
+
   /** Cluster network topology */
   private final NetworkTopology networktopology = new NetworkTopology();
 
@@ -71,7 +109,12 @@ public class DatanodeManager {
   private final DNSToSwitchMapping dnsToSwitchMapping;
 
   /** Read include/exclude files*/
-  private final HostsFileReader hostsReader; 
+  private final HostsFileReader hostsReader;
+
+  /** The period to wait for datanode heartbeat.*/
+  private final long heartbeatExpireInterval;
+  /** Ask Datanode only up to this many blocks to delete. */
+  final int blockInvalidateLimit;
   
   DatanodeManager(final FSNamesystem namesystem, final Configuration conf
       ) throws IOException {
@@ -90,6 +133,19 @@ public class DatanodeManager {
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
     }
+    
+    final long heartbeatIntervalSeconds = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    final int heartbeatRecheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
+    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+        + 10 * 1000 * heartbeatIntervalSeconds;
+    this.blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
+        DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
+    LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+        + "=" + this.blockInvalidateLimit);
   }
 
   private Daemon decommissionthread = null;
@@ -124,20 +180,88 @@ public class DatanodeManager {
       Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
     }    
   }
-  
+
+  CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
+      final String firstkey) {
+    return new CyclicIteration<String, DatanodeDescriptor>(
+        datanodeMap, firstkey);
+  }
+
   /** @return the datanode descriptor for the host. */
   public DatanodeDescriptor getDatanodeByHost(final String host) {
     return host2DatanodeMap.getDatanodeByHost(host);
   }
 
+  /** Get a datanode descriptor given corresponding storageID */
+  DatanodeDescriptor getDatanode(final String storageID) {
+    return datanodeMap.get(storageID);
+  }
+
+  /**
+   * Get data node by storage ID.
+   * 
+   * @param nodeID
+   * @return DatanodeDescriptor or null if the node is not found.
+   * @throws UnregisteredNodeException
+   */
+  public DatanodeDescriptor getDatanode(DatanodeID nodeID
+      ) throws UnregisteredNodeException {
+    final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
+    if (node == null) 
+      return null;
+    if (!node.getName().equals(nodeID.getName())) {
+      final UnregisteredNodeException e = new UnregisteredNodeException(
+          nodeID, node);
+      NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
+                                    + e.getLocalizedMessage());
+      throw e;
+    }
+    return node;
+  }
+
+  /** Prints information about all datanodes. */
+  void datanodeDump(final PrintWriter out) {
+    synchronized (datanodeMap) {
+      out.println("Metasave: Number of datanodes: " + datanodeMap.size());
+      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
+        DatanodeDescriptor node = it.next();
+        out.println(node.dumpDatanode());
+      }
+    }
+  }
+
+  /** Remove a dead datanode. */
+  public void removeDeadDatanode(final DatanodeID nodeID) {
+    synchronized(namesystem.heartbeats) {
+      synchronized(datanodeMap) {
+        DatanodeDescriptor d;
+        try {
+          d = getDatanode(nodeID);
+        } catch(IOException e) {
+          d = null;
+        }
+        if (d != null && isDatanodeDead(d)) {
+          NameNode.stateChangeLog.info(
+              "BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
+          namesystem.removeDatanode(d);
+        }
+      }
+    }
+  }
+
+  /** Is the datanode dead? */
+  public boolean isDatanodeDead(DatanodeDescriptor node) {
+    return (node.getLastUpdate() <
+            (Util.now() - heartbeatExpireInterval));
+  }
+
   /** Add a datanode. */
   private void addDatanode(final DatanodeDescriptor node) {
     // To keep host2DatanodeMap consistent with datanodeMap,
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
-    synchronized (namesystem.datanodeMap) {
-      host2DatanodeMap.remove(
-          namesystem.datanodeMap.put(node.getStorageID(), node));
+    synchronized(datanodeMap) {
+      host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
     }
 
     host2DatanodeMap.add(node);
@@ -152,8 +276,8 @@ public class DatanodeManager {
   /** Physically remove node from datanodeMap. */
   private void wipeDatanode(final DatanodeID node) throws IOException {
     final String key = node.getStorageID();
-    synchronized (namesystem.datanodeMap) {
-      host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
+    synchronized (datanodeMap) {
+      host2DatanodeMap.remove(datanodeMap.remove(key));
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
@@ -315,7 +439,7 @@ public class DatanodeManager {
     String newID = null;
     while(newID == null) {
       newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
-      if (namesystem.datanodeMap.get(newID) != null)
+      if (datanodeMap.get(newID) != null)
         newID = null;
     }
     return newID;
@@ -350,7 +474,7 @@ public class DatanodeManager {
         + "node registration from " + nodeReg.getName()
         + " storage " + nodeReg.getStorageID());
 
-    DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
+    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
     DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
       
     if (nodeN != null && nodeN != nodeS) {
@@ -461,7 +585,7 @@ public class DatanodeManager {
    * 4. Removed from exclude --> stop decommission.
    */
   public void refreshDatanodes() throws IOException {
-    for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
+    for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node, null)) {
         node.setDisallowed(true);  // case 2.
@@ -475,6 +599,45 @@ public class DatanodeManager {
     }
   }
 
+  /** @return the number of live datanodes. */
+  public int getNumLiveDataNodes() {
+    int numLive = 0;
+    synchronized (datanodeMap) {
+      for(DatanodeDescriptor dn : datanodeMap.values()) {
+        if (!isDatanodeDead(dn) ) {
+          numLive++;
+        }
+      }
+    }
+    return numLive;
+  }
+
+  /** @return the number of dead datanodes. */
+  public int getNumDeadDataNodes() {
+    int numDead = 0;
+    synchronized (datanodeMap) {   
+      for(DatanodeDescriptor dn : datanodeMap.values()) {
+        if (isDatanodeDead(dn) ) {
+          numDead++;
+        }
+      }
+    }
+    return numDead;
+  }
+
+  /** Fetch live and dead datanodes. */
+  public void fetchDatanodess(final List<DatanodeDescriptor> live, 
+      final List<DatanodeDescriptor> dead) {
+    final List<DatanodeDescriptor> results =
+        getDatanodeListForReport(DatanodeReportType.ALL);    
+    for(DatanodeDescriptor node : results) {
+      if (isDatanodeDead(node))
+        dead.add(node);
+      else
+        live.add(node);
+    }
+  }
+
   /** For generating datanode reports */
   public List<DatanodeDescriptor> getDatanodeListForReport(
       final DatanodeReportType type) {
@@ -499,13 +662,13 @@ public class DatanodeManager {
 
     ArrayList<DatanodeDescriptor> nodes = null;
     
-    synchronized (namesystem.datanodeMap) {
-      nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() + 
+    synchronized(datanodeMap) {
+      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
                                                 mustList.size());
-      Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
+      Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
       while (it.hasNext()) { 
         DatanodeDescriptor dn = it.next();
-        boolean isDead = namesystem.isDatanodeDead(dn);
+        final boolean isDead = isDatanodeDead(dn);
         if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
           nodes.add(dn);
         }
@@ -537,4 +700,77 @@ public class DatanodeManager {
     }
     return nodes;
   }
+  
+  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+    node.setLastUpdate(0);
+  }
+
+  /** Handle heartbeat from datanodes. */
+  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
+      final String blockPoolId,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xceiverCount, int maxTransfers, int failedVolumes
+      ) throws IOException {
+    synchronized (namesystem.heartbeats) {
+      synchronized (datanodeMap) {
+        DatanodeDescriptor nodeinfo = null;
+        try {
+          nodeinfo = getDatanode(nodeReg);
+        } catch(UnregisteredNodeException e) {
+          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+        }
+        
+        // Check if this datanode should actually be shutdown instead. 
+        if (nodeinfo != null && nodeinfo.isDisallowed()) {
+          setDatanodeDead(nodeinfo);
+          throw new DisallowedDatanodeException(nodeinfo);
+        }
+         
+        if (nodeinfo == null || !nodeinfo.isAlive) {
+          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+        }
+
+        namesystem.updateStats(nodeinfo, false);
+        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
+            xceiverCount, failedVolumes);
+        namesystem.updateStats(nodeinfo, true);
+        
+        //check lease recovery
+        BlockInfoUnderConstruction[] blocks = nodeinfo
+            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (blocks != null) {
+          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
+              blocks.length);
+          for (BlockInfoUnderConstruction b : blocks) {
+            brCommand.add(new RecoveringBlock(
+                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
+                    .getBlockRecoveryId()));
+          }
+          return new DatanodeCommand[] { brCommand };
+        }
+
+        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
+        //check pending replication
+        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+              maxTransfers);
+        if (pendingList != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+              pendingList));
+        }
+        //check block invalidation
+        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        if (blks != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
+              blockPoolId, blks));
+        }
+        
+        namesystem.addKeyUpdateCommand(cmds, nodeinfo);
+        if (!cmds.isEmpty()) {
+          return cmds.toArray(new DatanodeCommand[cmds.size()]);
+        }
+      }
+    }
+
+    return null;
+  }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Fri Jul 29 07:10:48 2011
@@ -83,8 +83,8 @@ class DecommissionManager {
     private void check() {
       int count = 0;
       for(Map.Entry<String, DatanodeDescriptor> entry
-          : new CyclicIteration<String, DatanodeDescriptor>(
-              fsnamesystem.datanodeMap, firstkey)) {
+          : blockManager.getDatanodeManager().getDatanodeCyclicIteration(
+              firstkey)) {
         final DatanodeDescriptor d = entry.getValue();
         firstkey = entry.getKey();
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Jul 29 07:10:48 2011
@@ -17,14 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.util.*;
-import java.io.*;
-import java.util.*;
+
+import java.io.PrintWriter;
 import java.sql.Time;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.util.Daemon;
 
 /***************************************************
  * PendingReplicationBlocks does the bookkeeping of all
@@ -38,6 +42,8 @@ import java.sql.Time;
  *
  ***************************************************/
 class PendingReplicationBlocks {
+  private static final Log LOG = BlockManager.LOG;
+
   private Map<Block, PendingBlockInfo> pendingReplications;
   private ArrayList<Block> timedOutItems;
   Daemon timerThread = null;
@@ -87,9 +93,8 @@ class PendingReplicationBlocks {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
-        if(FSNamesystem.LOG.isDebugEnabled()) {
-          FSNamesystem.LOG.debug("Removing pending replication for block" +
-              block);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Removing pending replication for " + block);
         }
         found.decrementReplicas();
         if (found.getNumReplicas() <= 0) {
@@ -186,9 +191,8 @@ class PendingReplicationBlocks {
           pendingReplicationCheck();
           Thread.sleep(period);
         } catch (InterruptedException ie) {
-          if(FSNamesystem.LOG.isDebugEnabled()) {
-            FSNamesystem.LOG.debug(
-                "PendingReplicationMonitor thread received exception. " + ie);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);
           }
         }
       }
@@ -202,8 +206,8 @@ class PendingReplicationBlocks {
         Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
                                     pendingReplications.entrySet().iterator();
         long now = now();
-        if(FSNamesystem.LOG.isDebugEnabled()) {
-          FSNamesystem.LOG.debug("PendingReplicationMonitor checking Q");
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("PendingReplicationMonitor checking Q");
         }
         while (iter.hasNext()) {
           Map.Entry<Block, PendingBlockInfo> entry = iter.next();
@@ -213,8 +217,7 @@ class PendingReplicationBlocks {
             synchronized (timedOutItems) {
               timedOutItems.add(block);
             }
-            FSNamesystem.LOG.warn(
-                "PendingReplicationMonitor timed out block " + block);
+            LOG.warn("PendingReplicationMonitor timed out " + block);
             iter.remove();
           }
         }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Jul 29 07:10:48 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -64,7 +65,7 @@ import org.apache.hadoop.util.VersionInf
 @InterfaceAudience.Private
 public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
-  final static public String WEB_UGI_PROPERTY_NAME = "dfs.web.ugi";
+  final static public String WEB_UGI_PROPERTY_NAME = DFSConfigKeys.DFS_WEB_UGI_KEY;
   public static final String DELEGATION_PARAMETER_NAME = "delegation";
   public static final String NAMENODE_ADDRESS = "nnaddr";
   static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Jul 29 07:10:48 2011
@@ -206,7 +206,7 @@ public abstract class Storage extends St
    * One of the storage directories.
    */
   @InterfaceAudience.Private
-  public class StorageDirectory {
+  public static class StorageDirectory {
     final File root;              // root directory
     final boolean useLock;        // flag to enable storage lock
     final StorageDirType dirType; // storage dir type
@@ -247,75 +247,11 @@ public abstract class Storage extends St
      */
     public StorageDirType getStorageDirType() {
       return dirType;
-    }
-    
-    /**
-     * Read version file.
-     * 
-     * @throws IOException if file cannot be read or contains inconsistent data
-     */
-    public void read() throws IOException {
-      read(getVersionFile());
-    }
-    public void read(File from) throws IOException {
-      Properties props = readFrom(from);
-      getFields(props, this);
-    }
-    
-    public Properties readFrom(File from) throws IOException {
-      RandomAccessFile file = new RandomAccessFile(from, "rws");
-      FileInputStream in = null;
-      Properties props = new Properties();
-      try {
-        in = new FileInputStream(file.getFD());
-        file.seek(0);
-        props.load(in);
-      } finally {
-        if (in != null) {
-          in.close();
-        }
-        file.close();
-      }
-      return props;
-    }
+    }    
 
-    /**
-     * Write version file.
-     * 
-     * @throws IOException
-     */
-    public void write() throws IOException {
-      write(getVersionFile());
-    }
-
-    public void write(File to) throws IOException {
-      Properties props = new Properties();
-      setFields(props, this);
-      RandomAccessFile file = new RandomAccessFile(to, "rws");
-      FileOutputStream out = null;
-      try {
-        file.seek(0);
-        out = new FileOutputStream(file.getFD());
-        /*
-         * If server is interrupted before this line, 
-         * the version file will remain unchanged.
-         */
-        props.store(out, null);
-        /*
-         * Now the new fields are flushed to the head of the file, but file 
-         * length can still be larger then required and therefore the file can 
-         * contain whole or corrupted fields from its old contents in the end.
-         * If server is interrupted here and restarted later these extra fields
-         * either should not effect server behavior or should be handled
-         * by the server correctly.
-         */
-        file.setLength(out.getChannel().position());
-      } finally {
-        if (out != null) {
-          out.close();
-        }
-        file.close();
-      }
+    public void read(File from, Storage storage) throws IOException {
+      Properties props = readPropertiesFile(from);
+      storage.setFieldsFromProperties(props, this);
     }
 
     /**
@@ -467,7 +403,8 @@ public abstract class Storage extends St
      * consistent and cannot be recovered.
      * @throws IOException
      */
-    public StorageState analyzeStorage(StartupOption startOpt) throws IOException {
+    public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
+        throws IOException {
       assert root != null : "root is null";
       String rootPath = root.getCanonicalPath();
       try { // check that storage exists
@@ -499,8 +436,9 @@ public abstract class Storage extends St
 
       if (startOpt == HdfsConstants.StartupOption.FORMAT)
         return StorageState.NOT_FORMATTED;
+
       if (startOpt != HdfsConstants.StartupOption.IMPORT) {
-        checkOldLayoutStorage(this);
+        storage.checkOldLayoutStorage(this);
       }
 
       // check whether current directory is valid
@@ -807,9 +745,8 @@ public abstract class Storage extends St
    * @param props
    * @throws IOException
    */
-  protected void getFields(Properties props, 
-                           StorageDirectory sd 
-                           ) throws IOException {
+  protected void setFieldsFromProperties(
+      Properties props, StorageDirectory sd) throws IOException {
     setLayoutVersion(props, sd);
     setNamespaceID(props, sd);
     setStorageType(props, sd);
@@ -818,15 +755,14 @@ public abstract class Storage extends St
   }
   
   /**
-   * Set common storage fields.
+   * Set common storage fields into the given properties object.
    * Should be overloaded if additional fields need to be set.
    * 
-   * @param props
-   * @throws IOException
+   * @param props the Properties object to write into
    */
-  protected void setFields(Properties props, 
-                           StorageDirectory sd 
-                           ) throws IOException {
+  protected void setPropertiesFromFields(Properties props, 
+                                         StorageDirectory sd)
+      throws IOException {
     props.setProperty("layoutVersion", String.valueOf(layoutVersion));
     props.setProperty("storageType", storageType.toString());
     props.setProperty("namespaceID", String.valueOf(namespaceID));
@@ -837,6 +773,77 @@ public abstract class Storage extends St
     props.setProperty("cTime", String.valueOf(cTime));
   }
 
+  /**
+   * Read properties from the VERSION file in the given storage directory.
+   */
+  public void readProperties(StorageDirectory sd) throws IOException {
+    Properties props = readPropertiesFile(sd.getVersionFile());
+    setFieldsFromProperties(props, sd);
+  }
+
+  /**
+   * Read properties from the the previous/VERSION file in the given storage directory.
+   */
+  public void readPreviousVersionProperties(StorageDirectory sd)
+      throws IOException {
+    Properties props = readPropertiesFile(sd.getPreviousVersionFile());
+    setFieldsFromProperties(props, sd);
+  }
+
+  /**
+   * Write properties to the VERSION file in the given storage directory.
+   */
+  public void writeProperties(StorageDirectory sd) throws IOException {
+    writeProperties(sd.getVersionFile(), sd);
+  }
+
+  public void writeProperties(File to, StorageDirectory sd) throws IOException {
+    Properties props = new Properties();
+    setPropertiesFromFields(props, sd);
+    RandomAccessFile file = new RandomAccessFile(to, "rws");
+    FileOutputStream out = null;
+    try {
+      file.seek(0);
+      out = new FileOutputStream(file.getFD());
+      /*
+       * If server is interrupted before this line, 
+       * the version file will remain unchanged.
+       */
+      props.store(out, null);
+      /*
+       * Now the new fields are flushed to the head of the file, but file 
+       * length can still be larger then required and therefore the file can 
+       * contain whole or corrupted fields from its old contents in the end.
+       * If server is interrupted here and restarted later these extra fields
+       * either should not effect server behavior or should be handled
+       * by the server correctly.
+       */
+      file.setLength(out.getChannel().position());
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+      file.close();
+    }
+  }
+  
+  public static Properties readPropertiesFile(File from) throws IOException {
+    RandomAccessFile file = new RandomAccessFile(from, "rws");
+    FileInputStream in = null;
+    Properties props = new Properties();
+    try {
+      in = new FileInputStream(file.getFD());
+      file.seek(0);
+      props.load(in);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+      file.close();
+    }
+    return props;
+  }
+
   public static void rename(File from, File to) throws IOException {
     if (!from.renameTo(to))
       throw new IOException("Failed to rename " 
@@ -861,7 +868,7 @@ public abstract class Storage extends St
   public void writeAll() throws IOException {
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
-      it.next().write();
+      writeProperties(it.next());
     }
   }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Fri Jul 29 07:10:48 2011
@@ -102,7 +102,7 @@ public class BlockPoolSliceStorage exten
       StorageDirectory sd = new StorageDirectory(dataDir, null, false);
       StorageState curState;
       try {
-        curState = sd.analyzeStorage(startOpt);
+        curState = sd.analyzeStorage(startOpt, this);
         // sd is locked but not opened
         switch (curState) {
         case NORMAL:
@@ -176,7 +176,7 @@ public class BlockPoolSliceStorage exten
     this.namespaceID = nsInfo.getNamespaceID();
     this.blockpoolID = nsInfo.getBlockPoolID();
     this.storageType = NodeType.DATA_NODE;
-    bpSdir.write();
+    writeProperties(bpSdir);
   }
 
   /**
@@ -184,7 +184,7 @@ public class BlockPoolSliceStorage exten
    * VERSION file
    */
   @Override
-  protected void setFields(Properties props, StorageDirectory sd)
+  protected void setPropertiesFromFields(Properties props, StorageDirectory sd)
       throws IOException {
     props.setProperty("layoutVersion", String.valueOf(layoutVersion));
     props.setProperty("namespaceID", String.valueOf(namespaceID));
@@ -208,7 +208,7 @@ public class BlockPoolSliceStorage exten
   }
   
   @Override
-  protected void getFields(Properties props, StorageDirectory sd)
+  protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
       throws IOException {
     setLayoutVersion(props, sd);
     setNamespaceID(props, sd);
@@ -237,7 +237,7 @@ public class BlockPoolSliceStorage exten
     if (startOpt == StartupOption.ROLLBACK)
       doRollback(sd, nsInfo); // rollback if applicable
     
-    sd.read();
+    readProperties(sd);
     checkVersionUpgradable(this.layoutVersion);
     assert this.layoutVersion >= FSConstants.LAYOUT_VERSION 
        : "Future version is not allowed";
@@ -331,7 +331,7 @@ public class BlockPoolSliceStorage exten
     assert this.namespaceID == nsInfo.getNamespaceID() 
         : "Data-node and name-node layout versions must be the same.";
     this.cTime = nsInfo.getCTime();
-    bpSd.write();
+    writeProperties(bpSd);
     
     // 4.rename <SD>/curernt/<bpid>/previous.tmp to <SD>/curernt/<bpid>/previous
     rename(bpTmpDir, bpPrevDir);
@@ -383,8 +383,7 @@ public class BlockPoolSliceStorage exten
       return;
     // read attributes out of the VERSION file of previous directory
     DataStorage prevInfo = new DataStorage();
-    StorageDirectory prevSD = prevInfo.new StorageDirectory(bpSd.getRoot());
-    prevSD.read(prevSD.getPreviousVersionFile());
+    prevInfo.readPreviousVersionProperties(bpSd);
 
     // We allow rollback to a state, which is either consistent with
     // the namespace state or can be further upgraded to it.
@@ -392,7 +391,7 @@ public class BlockPoolSliceStorage exten
     // && ( DN.previousCTime <= NN.ctime)
     if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION && 
         prevInfo.getCTime() <= nsInfo.getCTime())) { // cannot rollback
-      throw new InconsistentFSStateException(prevSD.getRoot(),
+      throw new InconsistentFSStateException(bpSd.getRoot(),
           "Cannot rollback to a newer state.\nDatanode previous state: LV = "
               + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
               + " is newer than the namespace state: LV = "

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jul 29 07:10:48 2011
@@ -18,32 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
@@ -448,8 +423,11 @@ public class DataNode extends Configured
       name = config.get(DFS_DATANODE_HOST_NAME_KEY);
     }
     if (name == null) {
-      name = DNS.getDefaultHost(config.get("dfs.datanode.dns.interface",
-          "default"), config.get("dfs.datanode.dns.nameserver", "default"));
+      name = DNS
+          .getDefaultHost(config.get(DFS_DATANODE_DNS_INTERFACE_KEY,
+              DFS_DATANODE_DNS_INTERFACE_DEFAULT), config.get(
+              DFS_DATANODE_DNS_NAMESERVER_KEY,
+              DFS_DATANODE_DNS_NAMESERVER_DEFAULT));
     }
     return name;
   }
@@ -521,7 +499,7 @@ public class DataNode extends Configured
   }
   
   private void startPlugins(Configuration conf) {
-    plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
+    plugins = conf.getInstances(DFS_DATANODE_PLUGINS_KEY, ServicePlugin.class);
     for (ServicePlugin p: plugins) {
       try {
         p.start(this);
@@ -810,8 +788,9 @@ public class DataNode extends Configured
       StartupOption startOpt = getStartupOption(conf);
       assert startOpt != null : "Startup option must be set.";
 
-      boolean simulatedFSDataset = 
-        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+      boolean simulatedFSDataset = conf.getBoolean(
+          DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
+          DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
       
       if (simulatedFSDataset) {
         initFsDataSet(conf, dataDirs);
@@ -1455,8 +1434,9 @@ public class DataNode extends Configured
     }
 
     // get version and id info from the name-node
-    boolean simulatedFSDataset = 
-      conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+    boolean simulatedFSDataset = conf.getBoolean(
+        DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
+        DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
 
     if (simulatedFSDataset) {
       storage.createStorageID(getPort());
@@ -1480,8 +1460,8 @@ public class DataNode extends Configured
    * Determine the http server's effective addr
    */
   public static InetSocketAddress getInfoAddr(Configuration conf) {
-    return NetUtils.createSocketAddr(
-        conf.get("dfs.datanode.http.address", "0.0.0.0:50075"));
+    return NetUtils.createSocketAddr(conf.get(DFS_DATANODE_HTTP_ADDRESS_KEY,
+        DFS_DATANODE_HTTP_ADDRESS_DEFAULT));
   }
   
   private void registerMXBean() {
@@ -2258,11 +2238,11 @@ public class DataNode extends Configured
   }
 
   private static void setStartupOption(Configuration conf, StartupOption opt) {
-    conf.set("dfs.datanode.startup", opt.toString());
+    conf.set(DFS_DATANODE_STARTUP_KEY, opt.toString());
   }
 
   static StartupOption getStartupOption(Configuration conf) {
-    return StartupOption.valueOf(conf.get("dfs.datanode.startup",
+    return StartupOption.valueOf(conf.get(DFS_DATANODE_STARTUP_KEY,
                                           StartupOption.REGULAR.toString()));
   }
 
@@ -2661,7 +2641,7 @@ public class DataNode extends Configured
   // Determine a Datanode's streaming address
   public static InetSocketAddress getStreamingAddr(Configuration conf) {
     return NetUtils.createSocketAddr(
-        conf.get("dfs.datanode.address", "0.0.0.0:50010"));
+        conf.get(DFS_DATANODE_ADDRESS_KEY, DFS_DATANODE_ADDRESS_DEFAULT));
   }
   
   @Override // DataNodeMXBean
@@ -2672,7 +2652,7 @@ public class DataNode extends Configured
   @Override // DataNodeMXBean
   public String getRpcPort(){
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
-        this.getConf().get("dfs.datanode.ipc.address"));
+        this.getConf().get(DFS_DATANODE_IPC_ADDRESS_KEY));
     return Integer.toString(ipcAddr.getPort());
   }
 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Fri Jul 29 07:10:48 2011
@@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
 
 /** 
  * Data storage information file.
@@ -153,7 +152,7 @@ public class DataStorage extends Storage
       StorageDirectory sd = new StorageDirectory(dataDir);
       StorageState curState;
       try {
-        curState = sd.analyzeStorage(startOpt);
+        curState = sd.analyzeStorage(startOpt, this);
         // sd is locked but not opened
         switch(curState) {
         case NORMAL:
@@ -274,7 +273,7 @@ public class DataStorage extends Storage
     this.namespaceID = nsInfo.getNamespaceID();
     this.cTime = 0;
     // store storageID as it currently is
-    sd.write();
+    writeProperties(sd);
   }
 
   /*
@@ -282,7 +281,7 @@ public class DataStorage extends Storage
    * DataStorage VERSION file
   */
   @Override
-  protected void setFields(Properties props, 
+  protected void setPropertiesFromFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
     props.setProperty("storageType", storageType.toString());
@@ -301,7 +300,7 @@ public class DataStorage extends Storage
    * DataStorage VERSION file and verify them.
    */
   @Override
-  protected void getFields(Properties props, StorageDirectory sd)
+  protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
       throws IOException {
     setLayoutVersion(props, sd);
     setcTime(props, sd);
@@ -373,7 +372,7 @@ public class DataStorage extends Storage
     if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
     }
-    sd.read();
+    readProperties(sd);
     checkVersionUpgradable(this.layoutVersion);
     assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
       "Future version is not allowed";
@@ -448,7 +447,7 @@ public class DataStorage extends Storage
     if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       clusterID = nsInfo.getClusterID();
       layoutVersion = nsInfo.getLayoutVersion();
-      sd.write();
+      writeProperties(sd);
       return;
     }
     
@@ -485,7 +484,7 @@ public class DataStorage extends Storage
     // 4. Write version file under <SD>/current
     layoutVersion = FSConstants.LAYOUT_VERSION;
     clusterID = nsInfo.getClusterID();
-    sd.write();
+    writeProperties(sd);
     
     // 5. Rename <SD>/previous.tmp to <SD>/previous
     rename(tmpDir, prevDir);
@@ -539,14 +538,13 @@ public class DataStorage extends Storage
     if (!prevDir.exists())
       return;
     DataStorage prevInfo = new DataStorage();
-    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
-    prevSD.read(prevSD.getPreviousVersionFile());
+    prevInfo.readPreviousVersionProperties(sd);
 
     // We allow rollback to a state, which is either consistent with
     // the namespace state or can be further upgraded to it.
     if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
           && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
-      throw new InconsistentFSStateException(prevSD.getRoot(),
+      throw new InconsistentFSStateException(sd.getRoot(),
           "Cannot rollback to a newer state.\nDatanode previous state: LV = "
               + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
               + " is newer than the namespace state: LV = "

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Jul 29 07:10:48 2011
@@ -54,7 +54,6 @@ import org.apache.hadoop.util.Daemon;
 @InterfaceAudience.Private
 public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
-  private static final int DEFAULT_SCAN_INTERVAL = 21600;
 
   private final DataNode datanode;
   private final FSDataset dataset;
@@ -225,7 +224,7 @@ public class DirectoryScanner implements
     this.datanode = dn;
     this.dataset = dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
-        DEFAULT_SCAN_INTERVAL);
+        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
     scanPeriodMsecs = interval * 1000L; //msec
     int threads = 
         conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,