You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/06 20:32:07 UTC
svn commit: r1143523 [1/4] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/protocol/datatransfe...
Author: todd
Date: Wed Jul 6 18:32:04 2011
New Revision: 1143523
URL: http://svn.apache.org/viewvc?rev=1143523&view=rev
Log:
Merge trunk into HDFS-1073. Some tests failing due to merge -- will be addressed in followup commit.
Added:
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/
- copied from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/
- copied from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecondaryWebUi.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecondaryWebUi.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestValidateConfigurationSettings.java
- copied unchanged from r1143516, hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestValidateConfigurationSettings.java
Removed:
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptReplicaInfo.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDatanodeDescriptor.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestPendingReplication.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestUnderReplicatedBlocks.java
Modified:
hadoop/common/branches/HDFS-1073/hdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/java/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestQuota.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHost2NodesMap.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/net/TestNetworkTopology.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/datanode/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/ (props changed)
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/hdfs/block_info_xml.jsp
hadoop/common/branches/HDFS-1073/hdfs/src/webapps/secondary/ (props changed)
Propchange: hadoop/common/branches/HDFS-1073/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jul 6 18:32:04 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs:1134994-1138149
+/hadoop/common/trunk/hdfs:1134994-1143516
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-1052:987665-1095512
/hadoop/hdfs/branches/HDFS-265:796829-820463
Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.txt Wed Jul 6 18:32:04 2011
@@ -291,6 +291,8 @@ Trunk (unreleased changes)
(todd)
HDFS-2055. Add hflush support to libhdfs. (Travis Crawford via eli)
+
+ HDFS-2083. Query JMX statistics over http via JMXJsonServlet. (tanping)
IMPROVEMENTS
@@ -519,6 +521,25 @@ Trunk (unreleased changes)
HDFS-1568. Improve the log messages in DataXceiver. (Joey Echeverria via
szetszwo)
+ HDFS-2100. Improve TestStorageRestore. (atm)
+
+ HDFS-2092. Remove some object references to Configuration in DFSClient.
+ (Bharath Mundlapudi via szetszwo)
+
+ HDFS-2087. Declare methods in DataTransferProtocol interface, and change
+ Sender and Receiver to implement the interface. (szetszwo)
+
+ HDFS-1723. quota errors messages should use the same scale. (Jim Plush via
+ atm)
+
+ HDFS-2110. StreamFile and ByteRangeInputStream cleanup. (eli)
+
+ HDFS-2107. Move block management code from o.a.h.h.s.namenode to a new
+ package o.a.h.h.s.blockmanagement. (szetszwo)
+
+ HDFS-2109. Store uMask as member variable to DFSClient.Conf. (Bharath
+ Mundlapudi via szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -534,8 +555,16 @@ Trunk (unreleased changes)
HDFS-2056. Update fetchdt usage. (Tanping Wang via jitendra)
+ HDFS-2118. Couple dfs data dir improvements. (eli)
+
BUG FIXES
+ HDFS-2011. Removal and restoration of storage directories on checkpointing
+ failure doesn't work properly. (Ravi Prakash via mattf)
+
+ HDFS-1955. FSImage.doUpgrade() was made too fault-tolerant by HDFS-1826.
+ (mattf)
+
HDFS-2061. Two minor bugs in BlockManager block report processing. (mattf)
HDFS-1449. Fix test failures - ExtendedBlock must return
@@ -762,6 +791,23 @@ Trunk (unreleased changes)
HDFS-1734. 'Chunk size to view' option is not working in Name Node UI.
(Uma Maheswara Rao G via jitendra)
+ HDFS-2086. If the include hosts list contains host names, after restarting
+ namenode, data nodes registration is denied. Contributed by Tanping Wang.
+
+ HDFS-2082. SecondaryNameNode web interface doesn't show the right info. (atm)
+
+ HDFS-1321. If service port and main port are the same, there is no clear
+ log message explaining the issue. (Jim Plush via atm)
+
+ HDFS-1381. HDFS javadocs hard-code references to dfs.namenode.name.dir and
+ dfs.datanode.data.dir parameters (Jim Plush via atm)
+
+ HDFS-2053. Bug in INodeDirectory#computeContentSummary warning.
+ (Michael Noll via eli)
+
+ HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara
+ Rao G via szetszwo)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
@@ -819,6 +865,8 @@ Release 0.22.0 - Unreleased
HDFS-528. Add ability for safemode to wait for a minimum number of
live datanodes (Todd Lipcon via eli)
+ HDFS-1753. Resource Leak in StreamFile. (Uma Maheswara Rao G via eli)
+
IMPROVEMENTS
HDFS-1304. Add a new unit test for HftpFileSystem.open(..). (szetszwo)
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jul 6 18:32:04 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1138149
+/hadoop/common/trunk/hdfs/src/c++/libhdfs:1134994-1143516
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
/hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jul 6 18:32:04 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/contrib/hdfsproxy:1134994-1138149
+/hadoop/common/trunk/hdfs/src/contrib/hdfsproxy:1134994-1143516
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy:987665-1095512
Propchange: hadoop/common/branches/HDFS-1073/hdfs/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jul 6 18:32:04 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hdfs/src/java:1134994-1138149
+/hadoop/common/trunk/hdfs/src/java:1134994-1143516
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Wed Jul 6 18:32:04 2011
@@ -404,10 +404,9 @@ public class BlockReader extends FSInput
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
- Sender.opReadBlock(
- new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
- block, startOffset, len, clientName, blockToken);
+ final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
// Get bytes in block, set streams
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Jul 6 18:32:04 2011
@@ -26,7 +26,6 @@ import java.net.URL;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple
@@ -60,7 +59,9 @@ class ByteRangeInputStream extends FSInp
}
}
-
+ enum StreamStatus {
+ NORMAL, SEEK
+ }
protected InputStream in;
protected URLOpener originalURL;
protected URLOpener resolvedURL;
@@ -68,9 +69,7 @@ class ByteRangeInputStream extends FSInp
protected long currentPos = 0;
protected long filelength;
- protected int status = STATUS_SEEK;
- protected static final int STATUS_NORMAL = 0;
- protected static final int STATUS_SEEK = 1;
+ StreamStatus status = StreamStatus.SEEK;
ByteRangeInputStream(final URL url) {
this(new URLOpener(url), new URLOpener(null));
@@ -82,18 +81,19 @@ class ByteRangeInputStream extends FSInp
}
private InputStream getInputStream() throws IOException {
- if (status != STATUS_NORMAL) {
+ if (status != StreamStatus.NORMAL) {
if (in != null) {
in.close();
in = null;
}
- // use the original url if no resolved url exists (e.g., if it's
- // the first time a request is made)
- final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL;
+ // Use the original url if no resolved url exists, eg. if
+ // it's the first time a request is made.
+ final URLOpener opener =
+ (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
- final HttpURLConnection connection = o.openConnection();
+ final HttpURLConnection connection = opener.openConnection();
try {
connection.setRequestMethod("GET");
if (startPos != 0) {
@@ -101,36 +101,35 @@ class ByteRangeInputStream extends FSInp
}
connection.connect();
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
- filelength = cl == null? -1: Long.parseLong(cl);
+ filelength = (cl == null) ? -1 : Long.parseLong(cl);
if (HftpFileSystem.LOG.isDebugEnabled()) {
HftpFileSystem.LOG.debug("filelength = " + filelength);
}
in = connection.getInputStream();
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
}
- if (startPos != 0 && connection.getResponseCode() != 206) {
- // we asked for a byte range but did not receive a partial content
+ int respCode = connection.getResponseCode();
+ if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
+ // We asked for a byte range but did not receive a partial content
// response...
- throw new IOException("206 expected, but received "
- + connection.getResponseCode());
- } else if(startPos == 0 && connection.getResponseCode() != 200) {
- // we asked for all bytes from the beginning but didn't receive a 200
+ throw new IOException("HTTP_PARTIAL expected, received " + respCode);
+ } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
+ // We asked for all bytes from the beginning but didn't receive a 200
// response (none of the other 2xx codes are valid here)
- throw new IOException("200 expected, but received "
- + connection.getResponseCode());
+ throw new IOException("HTTP_OK expected, received " + respCode);
}
-
+
resolvedURL.setURL(connection.getURL());
- status = STATUS_NORMAL;
+ status = StreamStatus.NORMAL;
}
return in;
}
- private void update(final boolean isEOF, final int n
- ) throws IOException {
+ private void update(final boolean isEOF, final int n)
+ throws IOException {
if (!isEOF) {
currentPos += n;
} else if (currentPos < filelength) {
@@ -154,7 +153,7 @@ class ByteRangeInputStream extends FSInp
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
- status = STATUS_SEEK;
+ status = StreamStatus.SEEK;
}
}
@@ -162,7 +161,7 @@ class ByteRangeInputStream extends FSInp
* Return the current offset from the start of the file
*/
public long getPos() throws IOException {
- return currentPos; // keep total count?
+ return currentPos;
}
/**
@@ -172,7 +171,4 @@ class ByteRangeInputStream extends FSInp
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
-
-}
-
-
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Jul 6 18:32:04 2011
@@ -128,17 +128,87 @@ public class DFSClient implements FSCons
static Random r = new Random();
final String clientName;
Configuration conf;
- long defaultBlockSize;
- private short defaultReplication;
SocketFactory socketFactory;
- int socketTimeout;
- final int writePacketSize;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
-
final SocketCache socketCache;
+ final Conf dfsClientConf;
+
+ /**
+ * DFSClient configuration
+ */
+ static class Conf {
+ final int maxBlockAcquireFailures;
+ final int confTime;
+ final int ioBufferSize;
+ final int bytesPerChecksum;
+ final int writePacketSize;
+ final int socketTimeout;
+ final int socketCacheCapacity;
+ /** Wait time window (in msec) if BlockMissingException is caught */
+ final int timeWindow;
+ final int nCachedConnRetry;
+ final int nBlockWriteRetry;
+ final int nBlockWriteLocateFollowingRetry;
+ final long defaultBlockSize;
+ final long prefetchSize;
+ final short defaultReplication;
+ final String taskId;
+ final FsPermission uMask;
+
+ Conf(Configuration conf) {
+ maxBlockAcquireFailures = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
+ DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+ confTime = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+ HdfsConstants.WRITE_TIMEOUT);
+ ioBufferSize = conf.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ bytesPerChecksum = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ HdfsConstants.READ_TIMEOUT);
+ /** dfs.write.packet.size is an internal config variable */
+ writePacketSize = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+ defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+ DEFAULT_BLOCK_SIZE);
+ defaultReplication = (short) conf.getInt(
+ DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+ taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
+ socketCacheCapacity = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ prefetchSize = conf.getLong(
+ DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
+ 10 * defaultBlockSize);
+ timeWindow = conf
+ .getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
+ nCachedConnRetry = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+ nBlockWriteRetry = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
+ DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+ nBlockWriteLocateFollowingRetry = conf
+ .getInt(
+ DFSConfigKeys
+ .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+ DFSConfigKeys
+ .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+ uMask = FsPermission.getUMask(conf);
+ }
+ }
+
+ Conf getConf() {
+ return dfsClientConf;
+ }
/**
* A map from file names to {@link DFSOutputStream} objects
@@ -257,16 +327,11 @@ public class DFSClient implements FSCons
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
+ // Copy only the required DFSClient configuration
+ this.dfsClientConf = new Conf(conf);
this.conf = conf;
this.stats = stats;
- this.socketTimeout =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
- HdfsConstants.READ_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
- // dfs.write.packet.size is an internal config variable
- this.writePacketSize =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
// The hdfsTimeout is currently the same as the ipc timeout
@@ -275,19 +340,8 @@ public class DFSClient implements FSCons
final String authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
-
- String taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
- this.clientName = leaserenewer.getClientName(taskId);
-
- defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
- defaultReplication = (short)
- conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
- DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-
- this.socketCache = new SocketCache(
- conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
- DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
-
+ this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
+ this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode);
@@ -306,8 +360,7 @@ public class DFSClient implements FSCons
* to retrieve block locations when reading.
*/
int getMaxBlockAcquireFailures() {
- return conf.getInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
- DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+ return dfsClientConf.maxBlockAcquireFailures;
}
/**
@@ -315,18 +368,14 @@ public class DFSClient implements FSCons
* @param numNodes the number of nodes in the pipeline.
*/
int getDatanodeWriteTimeout(int numNodes) {
- int confTime =
- conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
- HdfsConstants.WRITE_TIMEOUT);
-
- return (confTime > 0) ?
- (confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
+ return (dfsClientConf.confTime > 0) ?
+ (dfsClientConf.confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
}
int getDatanodeReadTimeout(int numNodes) {
- return socketTimeout > 0 ?
+ return dfsClientConf.socketTimeout > 0 ?
(HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes +
- socketTimeout) : 0;
+ dfsClientConf.socketTimeout) : 0;
}
int getHdfsTimeout() {
@@ -430,7 +479,7 @@ public class DFSClient implements FSCons
* @return the default block size in bytes
*/
public long getDefaultBlockSize() {
- return defaultBlockSize;
+ return dfsClientConf.defaultBlockSize;
}
/**
@@ -528,7 +577,7 @@ public class DFSClient implements FSCons
}
public short getDefaultReplication() {
- return defaultReplication;
+ return dfsClientConf.defaultReplication;
}
/**
@@ -583,7 +632,7 @@ public class DFSClient implements FSCons
public DFSInputStream open(String src)
throws IOException, UnresolvedLinkException {
- return open(src, conf.getInt("io.file.buffer.size", 4096), true, null);
+ return open(src, dfsClientConf.ioBufferSize, true, null);
}
/**
@@ -629,7 +678,8 @@ public class DFSClient implements FSCons
*/
public OutputStream create(String src, boolean overwrite)
throws IOException {
- return create(src, overwrite, defaultReplication, defaultBlockSize, null);
+ return create(src, overwrite, dfsClientConf.defaultReplication,
+ dfsClientConf.defaultBlockSize, null);
}
/**
@@ -639,7 +689,8 @@ public class DFSClient implements FSCons
public OutputStream create(String src,
boolean overwrite,
Progressable progress) throws IOException {
- return create(src, overwrite, defaultReplication, defaultBlockSize, progress);
+ return create(src, overwrite, dfsClientConf.defaultReplication,
+ dfsClientConf.defaultBlockSize, progress);
}
/**
@@ -660,7 +711,7 @@ public class DFSClient implements FSCons
public OutputStream create(String src, boolean overwrite, short replication,
long blockSize, Progressable progress) throws IOException {
return create(src, overwrite, replication, blockSize, progress,
- conf.getInt("io.file.buffer.size", 4096));
+ dfsClientConf.ioBufferSize);
}
/**
@@ -740,14 +791,13 @@ public class DFSClient implements FSCons
if (permission == null) {
permission = FsPermission.getDefault();
}
- FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
+ FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
- final DFSOutputStream result = new DFSOutputStream(this, src, masked,
- flag, createParent, replication, blockSize, progress, buffersize,
- conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
- DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
+ createParent, replication, blockSize, progress, buffersize,
+ dfsClientConf.bytesPerChecksum);
leaserenewer.put(src, result, this);
return result;
}
@@ -808,7 +858,7 @@ public class DFSClient implements FSCons
throws IOException {
try {
FsPermission dirPerm =
- FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
+ FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
namenode.createSymlink(target, link, dirPerm, createParent);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
@@ -851,8 +901,7 @@ public class DFSClient implements FSCons
UnresolvedPathException.class);
}
return new DFSOutputStream(this, src, buffersize, progress,
- lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
- DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ lastBlock, stat, dfsClientConf.bytesPerChecksum);
}
/**
@@ -1061,7 +1110,7 @@ public class DFSClient implements FSCons
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, socketTimeout);
+ return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);
}
/**
@@ -1117,7 +1166,7 @@ public class DFSClient implements FSCons
+ Op.BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5
- Sender.opBlockChecksum(out, block, lb.getBlockToken());
+ new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
@@ -1377,7 +1426,7 @@ public class DFSClient implements FSCons
if (permission == null) {
permission = FsPermission.getDefault();
}
- FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
+ FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
@@ -1404,7 +1453,7 @@ public class DFSClient implements FSCons
checkOpen();
if (absPermission == null) {
absPermission =
- FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
+ FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
}
if(LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Jul 6 18:32:04 2011
@@ -83,7 +83,7 @@ public class DFSInputStream extends FSIn
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
- private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
+ private int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -106,13 +106,9 @@ public class DFSInputStream extends FSIn
this.buffersize = buffersize;
this.src = src;
this.socketCache = dfsClient.socketCache;
- prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
- 10 * dfsClient.defaultBlockSize);
- timeWindow = this.dfsClient.conf.getInt(
- DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
- nCachedConnRetry = this.dfsClient.conf.getInt(
- DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
- DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+ prefetchSize = dfsClient.getConf().prefetchSize;
+ timeWindow = dfsClient.getConf().timeWindow;
+ nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
openInfo();
}
@@ -163,7 +159,7 @@ public class DFSInputStream extends FSIn
try {
cdp = DFSClient.createClientDatanodeProtocolProxy(
- datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock);
+ datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -771,8 +767,8 @@ public class DFSInputStream extends FSIn
// disaster.
sock.setTcpNoDelay(true);
- NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
- sock.setSoTimeout(dfsClient.socketTimeout);
+ NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
+ sock.setSoTimeout(dfsClient.getConf().socketTimeout);
}
try {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Jul 6 18:32:04 2011
@@ -103,7 +103,6 @@ import org.apache.hadoop.util.StringUtil
****************************************************************/
class DFSOutputStream extends FSOutputSummer implements Syncable {
private final DFSClient dfsClient;
- private Configuration conf;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s;
// closed is accessed by different threads under different locks.
@@ -355,7 +354,7 @@ class DFSOutputStream extends FSOutputSu
// that expected size of of a packet, then create
// smaller size packet.
//
- computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock),
+ computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
bytesPerChecksum);
}
@@ -426,8 +425,8 @@ class DFSOutputStream extends FSOutputSu
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
- now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) {
- long timeout = dfsClient.socketTimeout/2 - (now-lastPacket);
+ now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
+ long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
@@ -847,8 +846,8 @@ class DFSOutputStream extends FSOutputSu
DataNode.SMALL_BUFFER_SIZE));
//send the TRANSFER_BLOCK request
- Sender.opTransferBlock(out, block,
- dfsClient.clientName, targets, blockToken);
+ new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
+ targets);
//ack
in = new DataInputStream(NetUtils.getInputStream(sock));
@@ -953,8 +952,7 @@ class DFSOutputStream extends FSOutputSu
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
- int count = conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
- DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+ int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
do {
hasError = false;
@@ -1021,10 +1019,9 @@ class DFSOutputStream extends FSOutputSu
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- Sender.opWriteBlock(out, block,
- nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS,
- block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
- accessToken);
+ new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+ nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
+ nodes.length, block.getNumBytes(), bytesSent, newGS);
checksum.writeHeader(out);
out.flush();
@@ -1079,9 +1076,7 @@ class DFSOutputStream extends FSOutputSu
private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes)
throws IOException, UnresolvedLinkException {
- int retries =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
- DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+ int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
@@ -1201,7 +1196,6 @@ class DFSOutputStream extends FSOutputSu
int bytesPerChecksum, short replication) throws IOException {
super(new PureJavaCrc32(), bytesPerChecksum, 4);
this.dfsClient = dfsClient;
- this.conf = dfsClient.conf;
this.src = src;
this.blockSize = blockSize;
this.blockReplication = replication;
@@ -1232,7 +1226,7 @@ class DFSOutputStream extends FSOutputSu
throws IOException {
this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
- computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
try {
dfsClient.namenode.create(
@@ -1269,7 +1263,7 @@ class DFSOutputStream extends FSOutputSu
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
- computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer();
}
streamer.start();
@@ -1385,7 +1379,7 @@ class DFSOutputStream extends FSOutputSu
}
if (!appendChunk) {
- int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize);
+ int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Jul 6 18:32:04 2011
@@ -70,9 +70,8 @@ import org.xml.sax.XMLReader;
import org.xml.sax.helpers.DefaultHandler;
import org.xml.sax.helpers.XMLReaderFactory;
-
-
-/** An implementation of a protocol for accessing filesystems over HTTP.
+/**
+ * An implementation of a protocol for accessing filesystems over HTTP.
* The following implementation provides a limited, read-only interface
* to a filesystem over HTTP.
* @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
@@ -245,17 +244,26 @@ public class HftpFileSystem extends File
}
}
-
- /*
- Construct URL pointing to file on namenode
- */
- URL getNamenodeFileURL(Path f) throws IOException {
- return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + getUgiParameter());
+ /**
+ * Return a URL pointing to given path on the namenode.
+ *
+ * @param p path to obtain the URL for
+ * @return namenode URL referring to the given path
+ * @throws IOException on error constructing the URL
+ */
+ URL getNamenodeFileURL(Path p) throws IOException {
+ return getNamenodeURL("/data" + p.toUri().getPath(),
+ "ugi=" + getUgiParameter());
}
- /*
- Construct URL pointing to namenode.
- */
+ /**
+ * Return a URL pointing to given path on the namenode.
+ *
+ * @param path to obtain the URL for
+ * @param query string to append to the path
+ * @return namenode URL referring to the given path
+ * @throws IOException on error constructing the URL
+ */
URL getNamenodeURL(String path, String query) throws IOException {
try {
final URL url = new URI("http", null, nnAddr.getHostName(),
@@ -305,7 +313,7 @@ public class HftpFileSystem extends File
try {
connection.setRequestMethod("GET");
connection.connect();
- } catch(IOException ioe) {
+ } catch (IOException ioe) {
throwIOExceptionFromConnection(connection, ioe);
}
return connection;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java Wed Jul 6 18:32:04 2011
@@ -41,7 +41,8 @@ public class DSQuotaExceededException ex
String msg = super.getMessage();
if (msg == null) {
return "The DiskSpace quota" + (pathName==null?"":(" of " + pathName)) +
- " is exceeded: quota=" + quota + " diskspace consumed=" + StringUtils.humanReadableInt(count);
+ " is exceeded: quota=" + StringUtils.humanReadableInt(quota) +
+ " diskspace consumed=" + StringUtils.humanReadableInt(count);
} else {
return msg;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Wed Jul 6 18:32:04 2011
@@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
/**
* Transfer data to/from datanode using a streaming protocol.
@@ -35,8 +41,101 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 27:
- * Move DataTransferProtocol and the inner classes to a package.
+ * Version 28:
+ * Declare methods in DataTransferProtocol interface.
+ */
+ public static final int DATA_TRANSFER_VERSION = 28;
+
+ /**
+ * Read a block.
+ *
+ * @param blk the block being read.
+ * @param blockToken security token for accessing the block.
+ * @param clientName client's name.
+ * @param blockOffset offset of the block.
+ * @param length maximum number of bytes for this read.
+ */
+ public void readBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long blockOffset,
+ final long length) throws IOException;
+
+ /**
+ * Write a block to a datanode pipeline.
+ *
+ * @param blk the block being written.
+ * @param blockToken security token for accessing the block.
+ * @param clientName client's name.
+ * @param targets target datanodes in the pipeline.
+ * @param source source datanode.
+ * @param stage pipeline stage.
+ * @param pipelineSize the size of the pipeline.
+ * @param minBytesRcvd minimum number of bytes received.
+ * @param maxBytesRcvd maximum number of bytes received.
+ * @param latestGenerationStamp the latest generation stamp of the block.
+ */
+ public void writeBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo source,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp) throws IOException;
+
+ /**
+ * Transfer a block to another datanode.
+ * The block stage must be
+ * either {@link BlockConstructionStage#TRANSFER_RBW}
+ * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+ *
+ * @param blk the block being transferred.
+ * @param blockToken security token for accessing the block.
+ * @param clientName client's name.
+ * @param targets target datanodes.
+ */
+ public void transferBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException;
+
+ /**
+ * Receive a block from a source datanode
+ * and then notifies the namenode
+ * to remove the copy from the original datanode.
+ * Note that the source datanode and the original datanode can be different.
+ * It is used for balancing purpose.
+ *
+ * @param blk the block being replaced.
+ * @param blockToken security token for accessing the block.
+ * @param delHint the hint for deleting the block in the original datanode.
+ * @param source the source datanode for receiving the block.
+ */
+ public void replaceBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String delHint,
+ final DatanodeInfo source) throws IOException;
+
+ /**
+ * Copy a block.
+ * It is used for balancing purpose.
+ *
+ * @param blk the block being copied.
+ * @param blockToken security token for accessing the block.
+ */
+ public void copyBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+ /**
+ * Get block checksum (MD5 of CRC32).
+ *
+ * @param blk a block.
+ * @param blockToken security token for accessing the block.
+ * @throws IOException
*/
- public static final int DATA_TRANSFER_VERSION = 27;
+ public void blockChecksum(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Wed Jul 6 18:32:04 2011
@@ -27,23 +27,26 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
/** Receiver */
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public abstract class Receiver {
+public abstract class Receiver implements DataTransferProtocol {
+ protected final DataInputStream in;
+
+ /** Create a receiver for DataTransferProtocol with a socket. */
+ protected Receiver(final DataInputStream in) {
+ this.in = in;
+ }
+
/** Read an Op. It also checks protocol version. */
- protected final Op readOp(DataInputStream in) throws IOException {
+ protected final Op readOp() throws IOException {
final short version = in.readShort();
if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch (Expected: " +
@@ -54,11 +57,10 @@ public abstract class Receiver {
}
/** Process op by the corresponding method. */
- protected final void processOp(Op op, DataInputStream in
- ) throws IOException {
+ protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
- opReadBlock(in);
+ opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
@@ -81,121 +83,60 @@ public abstract class Receiver {
}
/** Receive OP_READ_BLOCK */
- private void opReadBlock(DataInputStream in) throws IOException {
+ private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-
- ExtendedBlock b = fromProto(
- proto.getHeader().getBaseHeader().getBlock());
- Token<BlockTokenIdentifier> token = fromProto(
- proto.getHeader().getBaseHeader().getToken());
-
- opReadBlock(in, b, proto.getOffset(), proto.getLen(),
- proto.getHeader().getClientName(), token);
- }
- /**
- * Abstract OP_READ_BLOCK method. Read a block.
- */
- protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
- long offset, long length, String client,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
+ readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()),
+ proto.getHeader().getClientName(),
+ proto.getOffset(),
+ proto.getLen());
+ }
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
- opWriteBlock(in,
- fromProto(proto.getHeader().getBaseHeader().getBlock()),
- proto.getPipelineSize(),
- fromProto(proto.getStage()),
- proto.getLatestGenerationStamp(),
- proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+ writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- fromProto(proto.getSource()),
fromProtos(proto.getTargetsList()),
- fromProto(proto.getHeader().getBaseHeader().getToken()));
+ fromProto(proto.getSource()),
+ fromProto(proto.getStage()),
+ proto.getPipelineSize(),
+ proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+ proto.getLatestGenerationStamp());
}
- /**
- * Abstract OP_WRITE_BLOCK method.
- * Write a block.
- */
- protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
- int pipelineSize, BlockConstructionStage stage, long newGs,
- long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
- DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
-
- opTransferBlock(in,
- fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- fromProtos(proto.getTargetsList()),
- fromProto(proto.getHeader().getBaseHeader().getToken()));
+ fromProtos(proto.getTargetsList()));
}
- /**
- * Abstract {@link Op#TRANSFER_BLOCK} method.
- * For {@link BlockConstructionStage#TRANSFER_RBW}
- * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
- */
- protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
- String client, DatanodeInfo[] targets,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-
- opReplaceBlock(in,
- fromProto(proto.getHeader().getBlock()),
+ replaceBlock(fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()),
proto.getDelHint(),
- fromProto(proto.getSource()),
- fromProto(proto.getHeader().getToken()));
+ fromProto(proto.getSource()));
}
- /**
- * Abstract OP_REPLACE_BLOCK method.
- * It is used for balancing purpose; send to a destination
- */
- protected abstract void opReplaceBlock(DataInputStream in,
- ExtendedBlock blk, String delHint, DatanodeInfo src,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
-
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-
- opCopyBlock(in,
- fromProto(proto.getHeader().getBlock()),
+ copyBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}
- /**
- * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
- * a proxy source.
- */
- protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
- opBlockChecksum(in,
- fromProto(proto.getHeader().getBlock()),
+ blockChecksum(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}
-
- /**
- * Abstract OP_BLOCK_CHECKSUM method.
- * Get the checksum of a block
- */
- protected abstract void opBlockChecksum(DataInputStream in,
- ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
- throws IOException;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Wed Jul 6 18:32:04 2011
@@ -44,7 +44,14 @@ import com.google.protobuf.Message;
/** Sender */
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class Sender {
+public class Sender implements DataTransferProtocol {
+ private final DataOutputStream out;
+
+ /** Create a sender for DataTransferProtocol with a output stream. */
+ public Sender(final DataOutputStream out) {
+ this.out = out;
+ }
+
/** Initialize a operation. */
private static void op(final DataOutput out, final Op op
) throws IOException {
@@ -59,79 +66,85 @@ public class Sender {
out.flush();
}
- /** Send OP_READ_BLOCK */
- public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
- long blockOffset, long blockLen, String clientName,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
+ @Override
+ public void readBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long blockOffset,
+ final long length) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
- .setLen(blockLen)
+ .setLen(length)
.build();
send(out, Op.READ_BLOCK, proto);
}
- /** Send OP_WRITE_BLOCK */
- public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
- int pipelineSize, BlockConstructionStage stage, long newGs,
- long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
- DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
- throws IOException {
- ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
- blockToken);
+ @Override
+ public void writeBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo source,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp) throws IOException {
+ ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
+ blk, clientName, blockToken);
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
- .addAllTargets(
- toProtos(targets, 1))
+ .addAllTargets(toProtos(targets, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
- .setLatestGenerationStamp(newGs);
+ .setLatestGenerationStamp(latestGenerationStamp);
- if (src != null) {
- proto.setSource(toProto(src));
+ if (source != null) {
+ proto.setSource(toProto(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
}
- /** Send {@link Op#TRANSFER_BLOCK} */
- public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
- String client, DatanodeInfo[] targets,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ @Override
+ public void transferBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
- blk, client, blockToken))
+ blk, clientName, blockToken))
.addAllTargets(toProtos(targets, 0))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
}
- /** Send OP_REPLACE_BLOCK */
- public static void opReplaceBlock(DataOutputStream out,
- ExtendedBlock blk, String delHint, DatanodeInfo src,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ @Override
+ public void replaceBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String delHint,
+ final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setDelHint(delHint)
- .setSource(toProto(src))
+ .setSource(toProto(source))
.build();
send(out, Op.REPLACE_BLOCK, proto);
}
- /** Send OP_COPY_BLOCK */
- public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
+ @Override
+ public void copyBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.build();
@@ -139,10 +152,9 @@ public class Sender {
send(out, Op.COPY_BLOCK, proto);
}
- /** Send OP_BLOCK_CHECKSUM */
- public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
+ @Override
+ public void blockChecksum(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.build();
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Jul 6 18:32:04 2011
@@ -61,10 +61,10 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
-import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.io.IOUtils;
@@ -348,8 +348,8 @@ public class Balancer {
private void sendRequest(DataOutputStream out) throws IOException {
final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
- Sender.opReplaceBlock(out, eb, source.getStorageID(),
- proxySource.getDatanode(), accessToken);
+ new Sender(out).replaceBlock(eb, accessToken,
+ source.getStorageID(), proxySource.getDatanode());
}
/* Receive a block copy response from the input stream */
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Jul 6 18:32:04 2011
@@ -26,8 +26,8 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLEncoder;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -45,14 +45,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.io.Text;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jul 6 18:32:04 2011
@@ -231,6 +231,9 @@ class BlockReceiver implements Closeable
} catch(IOException e) {
ioe = e;
}
+ finally {
+ IOUtils.closeStream(checksumOut);
+ }
// close block file
try {
if (out != null) {
@@ -244,6 +247,9 @@ class BlockReceiver implements Closeable
} catch (IOException e) {
ioe = e;
}
+ finally{
+ IOUtils.closeStream(out);
+ }
// disk check
if(ioe != null) {
datanode.checkDiskError(ioe);
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1143523&r1=1143522&r2=1143523&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jul 6 18:32:04 2011
@@ -1977,8 +1977,8 @@ public class DataNode extends Configured
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
- Sender.opWriteBlock(out,
- b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
+ new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+ stage, 0, 0, 0, 0);
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
@@ -2186,20 +2186,21 @@ public class DataNode extends Configured
continue;
}
// drop any (illegal) authority in the URI for backwards compatibility
- File data = new File(dirURI.getPath());
+ File dir = new File(dirURI.getPath());
try {
- DiskChecker.checkDir(localFS, new Path(data.toURI()), permission);
- dirs.add(data);
- } catch (IOException e) {
- LOG.warn("Invalid directory in: "
- + DFS_DATANODE_DATA_DIR_KEY + ": ", e);
- invalidDirs.append("\"").append(data.getCanonicalPath()).append("\" ");
+ DiskChecker.checkDir(localFS, new Path(dir.toURI()), permission);
+ dirs.add(dir);
+ } catch (IOException ioe) {
+ LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
+ + dir + " : ", ioe);
+ invalidDirs.append("\"").append(dir.getCanonicalPath()).append("\" ");
}
}
- if (dirs.size() == 0)
+ if (dirs.size() == 0) {
throw new IOException("All directories in "
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+ invalidDirs);
+ }
return dirs;
}