You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/20 23:00:48 UTC
svn commit: r1095512 [1/3] - in /hadoop/hdfs/branches/HDFS-1052: ./
src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/ap...
Author: suresh
Date: Wed Apr 20 21:00:45 2011
New Revision: 1095512
URL: http://svn.apache.org/viewvc?rev=1095512&view=rev
Log:
Merging changes r1090113:r1095461 from trunk to federation
Added:
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
- copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
- copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/util/
- copied from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/util/
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
- copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
- copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
Removed:
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
Modified:
hadoop/hdfs/branches/HDFS-1052/ (props changed)
hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
hadoop/hdfs/branches/HDFS-1052/build.xml (props changed)
hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml
hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/java/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (props changed)
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
hadoop/hdfs/branches/HDFS-1052/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
hadoop/hdfs/branches/HDFS-1052/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1052/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/HDFS-1052/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-265:796829-820463
/hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036738,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036738,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461
Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Wed Apr 20 21:00:45 2011
@@ -258,6 +258,16 @@ Trunk (unreleased changes)
HDFS-1813. Federation: Authentication using BlockToken in RPC to datanode
fails. (jitendra)
+ HDFS_1630. Support fsedits checksum. (hairong)
+
+ HDFS-1606. Provide a stronger data guarantee in the write pipeline by
+ adding a new datanode when an existing datanode failed. (szetszwo)
+
+ HDFS-1442. Api to get delegation token in Hdfs class. (jitendra)
+
+ HDFS-1070. Speedup namenode image loading and saving by storing only
+ local file names. (hairong)
+
IMPROVEMENTS
HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -327,6 +337,25 @@ Trunk (unreleased changes)
HDFS-1767. Namenode ignores non-initial block report from datanodes
when in safemode during startup. (Matt Foley via suresh)
+ HDFS-1817. Move pipeline_Fi_[39-51] from TestFiDataTransferProtocol
+ to TestFiPipelineClose. (szetszwo)
+
+ HDFS-1760. In FSDirectory.getFullPathName(..), it is better to return "/"
+ for root directory instead of an empty string. (Daryn Sharp via szetszwo)
+
+ HDFS-1833. Reduce repeated string constructions and unnecessary fields,
+ and fix comments in BlockReceiver.PacketResponder. (szetszwo)
+
+ HDFS-1486. Generalize CLITest structure and interfaces to faciliate
+ upstream adoption (e.g. for web testing). (cos)
+
+ HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
+ also HADOOP-7230. (Daryn Sharp via szetszwo)
+
+ HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+ being written are closed for a grace period, and start a new thread when
+ new files are opened for write. (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -397,6 +426,18 @@ Trunk (unreleased changes)
HDFS-1543. Reduce dev. cycle time by moving system testing artifacts from
default build and push to maven for HDFS (Luke Lu via cos)
+ HDFS-1818. TestHDFSCLI is failing on trunk after HADOOP-7202.
+ (Aaron T. Myers via todd)
+
+ HDFS-1828. TestBlocksWithNotEnoughRacks intermittently fails assert.
+ (Matt Foley via eli)
+
+ HDFS-1824. delay instantiation of file system object until it is
+ needed (linked to HADOOP-7207) (boryas)
+
+ HDFS-1831. Fix append bug in FileContext and implement CreateFlag
+ check (related to HADOOP-7223). (suresh)
+
Release 0.22.0 - Unreleased
NEW FEATURES
@@ -853,9 +894,20 @@ Release 0.21.1 - Unreleased
HDFS-1781. Fix the path for jsvc in bin/hdfs. (John George via szetszwo)
- HDFS-1782. Fix an NPE in RFSNamesystem.startFileInternal(..).
+ HDFS-1782. Fix an NPE in FSNamesystem.startFileInternal(..).
(John George via szetszwo)
+ HDFS-1821. Fix username resolution in NameNode.createSymlink(..) and
+ FSDirectory.addSymlink(..). (John George via szetszwo)
+
+ HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
+ and likely to fail on fast servers. (Matt Foley via eli)
+
+ HDFS-1845. Symlink comes up as directory after namenode restart.
+ (John George via eli)
+
+ HDFS-1666. Disable failing hdfsproxy test TestAuthorizationFilter (todd)
+
Release 0.21.1 - Unreleased
HDFS-1411. Correct backup node startup command in hdfs user guide.
Propchange: hadoop/hdfs/branches/HDFS-1052/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/build.xml:779102
/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
/hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/build.xml:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461
Propchange: hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/c++/libhdfs:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461
Modified: hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml Wed Apr 20 21:00:45 2011
@@ -46,9 +46,11 @@
<!-- Test all the contribs. -->
<!-- ====================================================== -->
<target name="test">
+ <!-- hdfsproxy tests failing due to HDFS-1666
<subant target="test">
<fileset dir="." includes="hdfsproxy/build.xml"/>
</subant>
+ -->
</target>
Propchange: hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461
Propchange: hadoop/hdfs/branches/HDFS-1052/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml Wed Apr 20 21:00:45 2011
@@ -318,6 +318,42 @@ creations/deletions), or "all".</descrip
</property>
<property>
+ <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
+ <value>ture</value>
+ <description>
+ If there is a datanode/network failure in the write pipeline,
+ DFSClient will try to remove the failed datanode from the pipeline
+ and then continue writing with the remaining datanodes. As a result,
+ the number of datanodes in the pipeline is decreased. The feature is
+ to add new datanodes to the pipeline.
+
+ This is a site-wise property to enable/disable the feature.
+
+ See also dfs.client.block.write.replace-datanode-on-failure.policy
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
+ <value>DEFAULT</value>
+ <description>
+ This property is used only if the value of
+ dfs.client.block.write.replace-datanode-on-failure.enable is true.
+
+ ALWAYS: always add a new datanode when an existing datanode is removed.
+
+ NEVER: never add a new datanode.
+
+ DEFAULT:
+ Let r be the replication number.
+ Let n be the number of existing datanodes.
+ Add a new datanode only if r is greater than or equal to 3 and either
+ (1) floor(r/2) is greater than or equal to n; or
+ (2) r is greater than n and the block is hflushed/appended.
+ </description>
+</property>
+
+<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
<description>Determines block reporting interval in milliseconds.</description>
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java Wed Apr 20 21:00:45 2011
@@ -25,6 +25,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
+import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -37,8 +39,13 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.Progressable;
@InterfaceAudience.Private
@@ -249,7 +256,7 @@ public class Hdfs extends AbstractFileSy
if (hasNext()) {
return thisListing.getPartialListing()[i++];
}
- throw new java.util.NoSuchElementException("No more entry in " + src);
+ throw new NoSuchElementException("No more entry in " + src);
}
}
@@ -384,4 +391,43 @@ public class Hdfs extends AbstractFileSy
public Path getLinkTarget(Path p) throws IOException {
return new Path(dfs.getLinkTarget(getUriPath(p)));
}
+
+ @Override //AbstractFileSystem
+ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+ Token<DelegationTokenIdentifier> result = dfs
+ .getDelegationToken(renewer == null ? null : new Text(renewer));
+ result.setService(new Text(this.getCanonicalServiceName()));
+ List<Token<?>> tokenList = new ArrayList<Token<?>>();
+ tokenList.add(result);
+ return tokenList;
+ }
+
+ /**
+ * Renew an existing delegation token.
+ *
+ * @param token delegation token obtained earlier
+ * @return the new expiration time
+ * @throws InvalidToken
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public long renewDelegationToken(
+ Token<? extends AbstractDelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+ }
+
+ /**
+ * Cancel an existing delegation token.
+ *
+ * @param token delegation token
+ * @throws InvalidToken
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public void cancelDelegationToken(
+ Token<? extends AbstractDelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Apr 20 21:00:45 2011
@@ -45,6 +45,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -128,15 +129,16 @@ public class DFSClient implements FSCons
private volatile long serverDefaultsLastUpdate;
static Random r = new Random();
final String clientName;
- final LeaseChecker leasechecker = new LeaseChecker();
Configuration conf;
long defaultBlockSize;
private short defaultReplication;
SocketFactory socketFactory;
int socketTimeout;
final int writePacketSize;
+ final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
+ final LeaseChecker leasechecker;
/**
* The locking hierarchy is to first acquire lock on DFSClient object, followed by
@@ -248,8 +250,11 @@ public class DFSClient implements FSCons
this.writePacketSize =
conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+ this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
+ this.leasechecker = new LeaseChecker(hdfsTimeout);
this.ugi = UserGroupInformation.getCurrentUser();
@@ -570,8 +575,9 @@ public class DFSClient implements FSCons
int buffersize)
throws IOException {
return create(src, FsPermission.getDefault(),
- overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
- replication, blockSize, progress, buffersize);
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ buffersize);
}
/**
@@ -637,9 +643,29 @@ public class DFSClient implements FSCons
}
/**
+ * Append to an existing file if {@link CreateFlag#APPEND} is present
+ */
+ private OutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+ int buffersize, Progressable progress) throws IOException {
+ if (flag.contains(CreateFlag.APPEND)) {
+ HdfsFileStatus stat = getFileInfo(src);
+ if (stat == null) { // No file to append to
+ // New file needs to be created if create option is present
+ if (!flag.contains(CreateFlag.CREATE)) {
+ throw new FileNotFoundException("failed to append to non-existent file "
+ + src + " on client " + clientName);
+ }
+ return null;
+ }
+ return callAppend(stat, src, buffersize, progress);
+ }
+ return null;
+ }
+
+ /**
* Same as {{@link #create(String, FsPermission, EnumSet, short, long,
* Progressable, int)} except that the permission
- * is absolute (ie has already been masked with umask.
+ * is absolute (ie has already been masked with umask.
*/
public OutputStream primitiveCreate(String src,
FsPermission absPermission,
@@ -652,9 +678,13 @@ public class DFSClient implements FSCons
int bytesPerChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
- OutputStream result = new DFSOutputStream(this, src, absPermission,
- flag, createParent, replication, blockSize, progress, buffersize,
- bytesPerChecksum);
+ CreateFlag.validate(flag);
+ OutputStream result = primitiveAppend(src, flag, buffersize, progress);
+ if (result == null) {
+ result = new DFSOutputStream(this, src, absPermission,
+ flag, createParent, replication, blockSize, progress, buffersize,
+ bytesPerChecksum);
+ }
leasechecker.put(src, result);
return result;
}
@@ -696,23 +726,11 @@ public class DFSClient implements FSCons
}
}
- /**
- * Append to an existing HDFS file.
- *
- * @param src file name
- * @param buffersize buffer size
- * @param progress for reporting write-progress
- * @return an output stream for writing into the file
- *
- * @see ClientProtocol#append(String, String)
- */
- OutputStream append(String src, int buffersize, Progressable progress)
- throws IOException {
- checkOpen();
- HdfsFileStatus stat = null;
+ /** Method to get stream returned by append call */
+ private OutputStream callAppend(HdfsFileStatus stat, String src,
+ int buffersize, Progressable progress) throws IOException {
LocatedBlock lastBlock = null;
try {
- stat = getFileInfo(src);
lastBlock = namenode.append(src, clientName);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
@@ -722,9 +740,26 @@ public class DFSClient implements FSCons
UnsupportedOperationException.class,
UnresolvedPathException.class);
}
- OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
+ return new DFSOutputStream(this, src, buffersize, progress,
lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ }
+
+ /**
+ * Append to an existing HDFS file.
+ *
+ * @param src file name
+ * @param buffersize buffer size
+ * @param progress for reporting write-progress
+ * @return an output stream for writing into the file
+ *
+ * @see ClientProtocol#append(String, String)
+ */
+ OutputStream append(String src, int buffersize, Progressable progress)
+ throws IOException {
+ checkOpen();
+ HdfsFileStatus stat = getFileInfo(src);
+ OutputStream result = callAppend(stat, src, buffersize, progress);
leasechecker.put(src, result);
return result;
}
@@ -1325,38 +1360,106 @@ public class DFSClient implements FSCons
}
}
- boolean isLeaseCheckerStarted() {
- return leasechecker.daemon != null;
- }
-
/** Lease management*/
- class LeaseChecker implements Runnable {
+ class LeaseChecker {
+ static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+ static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
/** A map from src -> DFSOutputStream of files that are currently being
* written by this client.
*/
private final SortedMap<String, OutputStream> pendingCreates
= new TreeMap<String, OutputStream>();
+ /** The time in milliseconds that the map became empty. */
+ private long emptyTime = Long.MAX_VALUE;
+ /** A fixed lease renewal time period in milliseconds */
+ private final long renewal;
+ /** A daemon for renewing lease */
private Daemon daemon = null;
-
+ /** Only the daemon with currentId should run. */
+ private int currentId = 0;
+
+ /**
+ * A period in milliseconds that the lease renewer thread should run
+ * after the map became empty.
+ * If the map is empty for a time period longer than the grace period,
+ * the renewer should terminate.
+ */
+ private long gracePeriod;
+ /**
+ * The time period in milliseconds
+ * that the renewer sleeps for each iteration.
+ */
+ private volatile long sleepPeriod;
+
+ private LeaseChecker(final long timeout) {
+ this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)?
+ timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+ setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+ }
+
+ /** Set the grace period and adjust the sleep period accordingly. */
+ void setGraceSleepPeriod(final long gracePeriod) {
+ if (gracePeriod < 100L) {
+ throw new HadoopIllegalArgumentException(gracePeriod
+ + " = gracePeriod < 100ms is too small.");
+ }
+ synchronized(this) {
+ this.gracePeriod = gracePeriod;
+ }
+ final long half = gracePeriod/2;
+ this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+ half: LEASE_RENEWER_SLEEP_DEFAULT;
+ }
+
+ /** Is the daemon running? */
+ synchronized boolean isRunning() {
+ return daemon != null && daemon.isAlive();
+ }
+
+ /** Is the empty period longer than the grace period? */
+ private synchronized boolean isRenewerExpired() {
+ return emptyTime != Long.MAX_VALUE
+ && System.currentTimeMillis() - emptyTime > gracePeriod;
+ }
+
synchronized void put(String src, OutputStream out) {
if (clientRunning) {
- if (daemon == null) {
- daemon = new Daemon(this);
+ if (daemon == null || isRenewerExpired()) {
+ //start a new deamon with a new id.
+ final int id = ++currentId;
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LeaseChecker.this.run(id);
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+ + " is interrupted.", e);
+ }
+ }
+ }
+ });
daemon.start();
}
pendingCreates.put(src, out);
+ emptyTime = Long.MAX_VALUE;
}
}
synchronized void remove(String src) {
pendingCreates.remove(src);
+ if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+ //discover the first time that the map is empty.
+ emptyTime = System.currentTimeMillis();
+ }
}
void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
- if (daemon != null) {
+ if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
@@ -1423,37 +1526,30 @@ public class DFSClient implements FSCons
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
*/
- public void run() {
- long lastRenewed = 0;
- int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
- if (hdfsTimeout > 0) {
- renewal = Math.min(renewal, hdfsTimeout/2);
- }
- while (clientRunning && !Thread.interrupted()) {
- if (System.currentTimeMillis() - lastRenewed > renewal) {
+ private void run(final int id) throws InterruptedException {
+ for(long lastRenewed = System.currentTimeMillis();
+ clientRunning && !Thread.interrupted();
+ Thread.sleep(sleepPeriod)) {
+ if (System.currentTimeMillis() - lastRenewed >= renewal) {
try {
renew();
lastRenewed = System.currentTimeMillis();
} catch (SocketTimeoutException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Shutting down HDFS client...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Aborting ...", ie);
abort();
break;
} catch (IOException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Will retry shortly...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Will retry shortly ...", ie);
}
}
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " is interrupted.", ie);
+ synchronized(this) {
+ if (id != currentId || isRenewerExpired()) {
+ //no longer the current daemon or expired
+ return;
}
- return;
}
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Apr 20 21:00:45 2011
@@ -40,6 +40,10 @@ public class DFSConfigKeys extends Commo
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+ public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
+ public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
+ public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
+ public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Apr 20 21:00:45 2011
@@ -31,8 +31,10 @@ import java.net.Socket;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
@@ -96,9 +98,6 @@ import org.apache.hadoop.util.StringUtil
* starts sending packets from the dataQueue.
****************************************************************/
class DFSOutputStream extends FSOutputSummer implements Syncable {
- /**
- *
- */
private final DFSClient dfsClient;
private Configuration conf;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
@@ -295,10 +294,18 @@ class DFSOutputStream extends FSOutputSu
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
+ /** Nodes have been used in the pipeline before and have failed. */
+ private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+ /** Has the current block been hflushed? */
+ private boolean isHflushed = false;
+ /** Append on an existing block? */
+ private final boolean isAppend;
+
/**
* Default construction for file create
*/
private DataStreamer() {
+ isAppend = false;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@@ -311,6 +318,7 @@ class DFSOutputStream extends FSOutputSu
*/
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
+ isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
@@ -750,6 +758,105 @@ class DFSOutputStream extends FSOutputSu
return doSleep;
}
+ private void setHflush() {
+ isHflushed = true;
+ }
+
+ private int findNewDatanode(final DatanodeInfo[] original
+ ) throws IOException {
+ if (nodes.length != original.length + 1) {
+ throw new IOException("Failed to add a datanode:"
+ + " nodes.length != original.length + 1, nodes="
+ + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+ }
+ for(int i = 0; i < nodes.length; i++) {
+ int j = 0;
+ for(; j < original.length && !nodes[i].equals(original[j]); j++);
+ if (j == original.length) {
+ return i;
+ }
+ }
+ throw new IOException("Failed: new datanode not found: nodes="
+ + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+ }
+
+ private void addDatanode2ExistingPipeline() throws IOException {
+ if (DataTransferProtocol.LOG.isDebugEnabled()) {
+ DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+ }
+ /*
+ * Is data transfer necessary? We have the following cases.
+ *
+ * Case 1: Failure in Pipeline Setup
+ * - Append
+ * + Transfer the stored replica, which may be a RBW or a finalized.
+ * - Create
+ * + If no data, then no transfer is required.
+ * + If there are data written, transfer RBW. This case may happens
+ * when there are streaming failure earlier in this pipeline.
+ *
+ * Case 2: Failure in Streaming
+ * - Append/Create:
+ * + transfer RBW
+ *
+ * Case 3: Failure in Close
+ * - Append/Create:
+ * + no transfer, let NameNode replicates the block.
+ */
+ if (!isAppend && lastAckedSeqno < 0
+ && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+ //no data have been written
+ return;
+ } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+ || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ //pipeline is closing
+ return;
+ }
+
+ //get a new datanode
+ final DatanodeInfo[] original = nodes;
+ final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+ src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+ 1, dfsClient.clientName);
+ nodes = lb.getLocations();
+
+ //find the new datanode
+ final int d = findNewDatanode(original);
+
+ //transfer replica
+ final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+ final DatanodeInfo[] targets = {nodes[d]};
+ transfer(src, targets, lb.getBlockToken());
+ }
+
+ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ //transfer replica to the new datanode
+ Socket sock = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ sock = createSocketForPipeline(src, 2, dfsClient);
+ final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+ out = new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock, writeTimeout),
+ DataNode.SMALL_BUFFER_SIZE));
+
+ //send the TRANSFER_BLOCK request
+ DataTransferProtocol.Sender.opTransferBlock(out, block,
+ dfsClient.clientName, targets, blockToken);
+
+ //ack
+ in = new DataInputStream(NetUtils.getInputStream(sock));
+ if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+ throw new IOException("Failed to add a datanode");
+ }
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ IOUtils.closeSocket(sock);
+ }
+ }
/**
* Open a DataOutputStream to a DataNode pipeline so that
@@ -793,6 +900,8 @@ class DFSOutputStream extends FSOutputSu
DFSClient.LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
+ failed.add(nodes[errorIndex]);
+
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
@@ -803,6 +912,12 @@ class DFSOutputStream extends FSOutputSu
errorIndex = -1;
}
+ // Check if replace-datanode policy is satisfied.
+ if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
+ nodes, isAppend, isHflushed)) {
+ addDatanode2ExistingPipeline();
+ }
+
// get a new generation stamp and an access token
LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
@@ -888,7 +1003,7 @@ class DFSOutputStream extends FSOutputSu
boolean result = false;
try {
- s = createSocketForPipeline(nodes, dfsClient);
+ s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
//
@@ -1026,18 +1141,19 @@ class DFSOutputStream extends FSOutputSu
/**
* Create a socket for a write pipeline
- * @param datanodes the datanodes on the pipeline
+ * @param first the first datanode
+ * @param length the pipeline length
* @param client
* @return the socket connected to the first datanode
*/
- static Socket createSocketForPipeline(final DatanodeInfo[] datanodes,
- final DFSClient client) throws IOException {
+ static Socket createSocketForPipeline(final DatanodeInfo first,
+ final int length, final DFSClient client) throws IOException {
if(DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connecting to datanode " + datanodes[0].getName());
+ DFSClient.LOG.debug("Connecting to datanode " + first.getName());
}
- final InetSocketAddress isa = NetUtils.createSocketAddr(datanodes[0].getName());
+ final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
final Socket sock = client.socketFactory.createSocket();
- final int timeout = client.getDatanodeReadTimeout(datanodes.length);
+ final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, timeout);
sock.setSoTimeout(timeout);
sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
@@ -1363,6 +1479,12 @@ class DFSOutputStream extends FSOutputSu
throw ioe;
}
}
+
+ synchronized(this) {
+ if (streamer != null) {
+ streamer.setHflush();
+ }
+ }
} catch (InterruptedIOException interrupt) {
// This kind of error doesn't mean that the stream itself is broken - just the
// flushing thread got interrupted. So, we shouldn't close down the writer,
@@ -1577,7 +1699,7 @@ class DFSOutputStream extends FSOutputSu
/**
* Returns the access token currently used by streamer, for testing only
*/
- Token<BlockTokenIdentifier> getBlockToken() {
+ synchronized Token<BlockTokenIdentifier> getBlockToken() {
return streamer.getBlockToken();
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Apr 20 21:00:45 2011
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -242,9 +243,9 @@ public class DistributedFileSystem exten
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream(dfs.create(getPathName(f), permission,
- overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
- replication, blockSize, progress, bufferSize),
- statistics);
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ bufferSize), statistics);
}
@SuppressWarnings("deprecation")
@@ -266,6 +267,9 @@ public class DistributedFileSystem exten
EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
+ if (flag.contains(CreateFlag.OVERWRITE)) {
+ flag.add(CreateFlag.CREATE);
+ }
return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
false, replication, blockSize, progress, bufferSize), statistics);
}
@@ -810,6 +814,14 @@ public class DistributedFileSystem exten
throws IOException {
return dfs.getDelegationToken(renewer);
}
+
+ @Override // FileSystem
+ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+ List<Token<?>> tokenList = new ArrayList<Token<?>>();
+ Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
+ tokenList.add(token);
+ return tokenList;
+ }
/**
* Renew an existing delegation token.
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Apr 20 21:00:45 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 66: Add block pool ID to Block
+ * 67: Add block pool ID to Block
*/
- public static final long versionID = 66L;
+ public static final long versionID = 67L;
///////////////////////////////////////
// File contents
@@ -298,6 +298,30 @@ public interface ClientProtocol extends
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException;
+ /**
+ * Get a datanode for an existing pipeline.
+ *
+ * @param src the file being written
+ * @param blk the block being written
+ * @param existings the existing nodes in the pipeline
+ * @param excludes the excluded nodes
+ * @param numAdditionalNodes number of additional datanodes
+ * @param clientName the name of the client
+ *
+ * @return the located block.
+ *
+ * @throws AccessControlException If access is denied
+ * @throws FileNotFoundException If file <code>src</code> is not found
+ * @throws SafeModeException create not allowed in safemode
+ * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+ * @throws IOException If an I/O error occurred
+ */
+ public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+ final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws AccessControlException, FileNotFoundException,
+ SafeModeException, UnresolvedLinkException, IOException;
+
/**
* The client is done writing data to the given filename, and would
* like to complete it.
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Apr 20 21:00:45 2011
@@ -25,8 +25,13 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -39,18 +44,18 @@ import org.apache.hadoop.security.token.
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface DataTransferProtocol {
-
+ public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
/** Version for data transfers between clients and datanodes
* This should change when serialization of DatanodeInfo, not just
* when protocol changes. It is not very obvious.
*/
/*
- * Version 22:
+ * Version 23:
* Changed the protocol methods to use ExtendedBlock instead
* of Block.
*/
- public static final int DATA_TRANSFER_VERSION = 21;
+ public static final int DATA_TRANSFER_VERSION = 23;
/** Operation */
public enum Op {
@@ -750,4 +755,93 @@ public interface DataTransferProtocol {
}
}
+ /**
+ * The setting of replace-datanode-on-failure feature.
+ */
+ public enum ReplaceDatanodeOnFailure {
+ /** The feature is disabled in the entire site. */
+ DISABLE,
+ /** Never add a new datanode. */
+ NEVER,
+ /**
+ * DEFAULT policy:
+ * Let r be the replication number.
+ * Let n be the number of existing datanodes.
+ * Add a new datanode only if r >= 3 and either
+ * (1) floor(r/2) >= n; or
+ * (2) r > n and the block is hflushed/appended.
+ */
+ DEFAULT,
+ /** Always add a new datanode when an existing datanode is removed. */
+ ALWAYS;
+
+ /** Check if the feature is enabled. */
+ public void checkEnabled() {
+ if (this == DISABLE) {
+ throw new UnsupportedOperationException(
+ "This feature is disabled. Please refer to "
+ + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+ + " configuration property.");
+ }
+ }
+
+ /** Is the policy satisfied? */
+ public boolean satisfy(
+ final short replication, final DatanodeInfo[] existings,
+ final boolean isAppend, final boolean isHflushed) {
+ final int n = existings == null? 0: existings.length;
+ if (n == 0 || n >= replication) {
+ //don't need to add datanode for any policy.
+ return false;
+ } else if (this == DISABLE || this == NEVER) {
+ return false;
+ } else if (this == ALWAYS) {
+ return true;
+ } else {
+ //DEFAULT
+ if (replication < 3) {
+ return false;
+ } else {
+ if (n <= (replication/2)) {
+ return true;
+ } else {
+ return isAppend || isHflushed;
+ }
+ }
+ }
+ }
+
+ /** Get the setting from configuration. */
+ public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+ final boolean enabled = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+ if (!enabled) {
+ return DISABLE;
+ }
+
+ final String policy = conf.get(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+ for(int i = 1; i < values().length; i++) {
+ final ReplaceDatanodeOnFailure rdof = values()[i];
+ if (rdof.name().equalsIgnoreCase(policy)) {
+ return rdof;
+ }
+ }
+ throw new HadoopIllegalArgumentException("Illegal configuration value for "
+ + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+ + ": " + policy);
+ }
+
+ /** Write the setting to configuration. */
+ public void write(final Configuration conf) {
+ conf.setBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+ this != DISABLE);
+ conf.set(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+ name());
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Wed Apr 20 21:00:45 2011
@@ -88,7 +88,7 @@ public interface FSConstants {
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -29;
+ public static final int LAYOUT_VERSION = -31;
// Current version:
- // -29: Adding support for block pools and multiple namenodes
+ // -31: Adding support for block pools and multiple namenodes
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Apr 20 21:00:45 2011
@@ -79,7 +79,7 @@ public abstract class Storage extends St
public static final int PRE_RBW_LAYOUT_VERSION = -19;
// last layout version that is before federation
- public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -28;
+ public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30;
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Apr 20 21:00:45 2011
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.pro
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import java.io.BufferedOutputStream;
+import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -56,7 +57,7 @@ import org.apache.hadoop.util.StringUtil
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
**/
-class BlockReceiver implements java.io.Closeable, FSConstants {
+class BlockReceiver implements Closeable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
@@ -652,7 +653,7 @@ class BlockReceiver implements java.io.C
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
- int numTargets) throws IOException {
+ DatanodeInfo[] downstreams) throws IOException {
boolean responderClosed = false;
mirrorOut = mirrOut;
@@ -662,9 +663,8 @@ class BlockReceiver implements java.io.C
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
- new PacketResponder(this, block, mirrIn, replyOut,
- numTargets, Thread.currentThread()));
- responder.start(); // start thread to processes reponses
+ new PacketResponder(replyOut, mirrIn, downstreams));
+ responder.start(); // start thread to processes responses
}
/*
@@ -700,8 +700,7 @@ class BlockReceiver implements java.io.C
}
} catch (IOException ioe) {
- LOG.info("Exception in receiveBlock for block " + block +
- " " + ioe);
+ LOG.info("Exception in receiveBlock for " + block, ioe);
throw ioe;
} finally {
if (!responderClosed) { // Abnormal termination of the flow above
@@ -808,51 +807,71 @@ class BlockReceiver implements java.io.C
}
}
+ private static enum PacketResponderType {
+ NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
+ }
/**
* Processed responses from downstream datanodes in the pipeline
* and sends back replies to the originator.
*/
- class PacketResponder implements Runnable, FSConstants {
+ class PacketResponder implements Runnable, Closeable, FSConstants {
- //packet waiting for ack
- private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+ /** queue for packets waiting for ack */
+ private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+ /** the thread that spawns this responder */
+ private final Thread receiverThread = Thread.currentThread();
+ /** is this responder running? */
private volatile boolean running = true;
- private ExtendedBlock block;
- DataInputStream mirrorIn; // input from downstream datanode
- DataOutputStream replyOut; // output to upstream datanode
- private int numTargets; // number of downstream datanodes including myself
- private BlockReceiver receiver; // The owner of this responder.
- private Thread receiverThread; // the thread that spawns this responder
+ /** input from the next downstream datanode */
+ private final DataInputStream downstreamIn;
+ /** output to upstream datanode/client */
+ private final DataOutputStream upstreamOut;
+
+ /** The type of this responder */
+ private final PacketResponderType type;
+ /** for log and error messages */
+ private final String myString;
+
+ @Override
public String toString() {
- return "PacketResponder " + numTargets + " for Block " + this.block;
+ return myString;
}
- PacketResponder(BlockReceiver receiver, ExtendedBlock b, DataInputStream in,
- DataOutputStream out, int numTargets,
- Thread receiverThread) {
- this.receiverThread = receiverThread;
- this.receiver = receiver;
- this.block = b;
- mirrorIn = in;
- replyOut = out;
- this.numTargets = numTargets;
+ PacketResponder(final DataOutputStream upstreamOut,
+ final DataInputStream downstreamIn,
+ final DatanodeInfo[] downstreams) {
+ this.downstreamIn = downstreamIn;
+ this.upstreamOut = upstreamOut;
+
+ this.type = downstreams == null? PacketResponderType.NON_PIPELINE
+ : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
+ : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
+
+ final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+ .append(": ").append(block).append(", type=").append(type);
+ if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+ b.append(", downstreams=").append(downstreams.length)
+ .append(":").append(Arrays.asList(downstreams));
+ }
+ this.myString = b.toString();
}
/**
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
- * @param lastByteInPacket
+ * @param offsetInBlock
*/
- synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+ synchronized void enqueue(final long seqno,
+ final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) {
+ final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
if(LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
- " to ack queue.");
+ LOG.debug(myString + ": enqueue " + p);
}
- ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
+ ackQueue.addLast(p);
notifyAll();
}
}
@@ -860,7 +879,8 @@ class BlockReceiver implements java.io.C
/**
* wait for all pending packets to be acked. Then shutdown thread.
*/
- synchronized void close() {
+ @Override
+ public synchronized void close() {
while (running && ackQueue.size() != 0 && datanode.shouldRun) {
try {
wait();
@@ -869,8 +889,7 @@ class BlockReceiver implements java.io.C
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block + " Closing down.");
+ LOG.debug(myString + ": closing");
}
running = false;
notifyAll();
@@ -892,21 +911,21 @@ class BlockReceiver implements java.io.C
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
try {
- if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+ if (type != PacketResponderType.LAST_IN_PIPELINE
+ && !mirrorError) {
// read an ack from downstream datanode
- ack.readFields(mirrorIn);
+ ack.readFields(downstreamIn);
if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ LOG.debug(myString + " got " + ack);
}
seqno = ack.getSeqno();
}
- if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
+ if (seqno != PipelineAck.UNKOWN_SEQNO
+ || type == PacketResponderType.LAST_IN_PIPELINE) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " seqno = " + seqno +
- " for block " + block +
+ LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
wait();
@@ -916,11 +935,10 @@ class BlockReceiver implements java.io.C
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- if (numTargets > 0 && seqno != expected) {
- throw new IOException("PacketResponder " + numTargets +
- " for block " + block +
- " expected seqno:" + expected +
- " received:" + seqno);
+ if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+ && seqno != expected) {
+ throw new IOException(myString + "seqno: expected="
+ + expected + ", received=" + seqno);
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
@@ -935,8 +953,7 @@ class BlockReceiver implements java.io.C
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(ioe));
+ LOG.info(myString, ioe);
}
}
@@ -948,8 +965,7 @@ class BlockReceiver implements java.io.C
* because this datanode has a problem. The upstream datanode
* will detect that this datanode is bad, and rightly so.
*/
- LOG.info("PacketResponder " + block + " " + numTargets +
- " : Thread is interrupted.");
+ LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
@@ -957,7 +973,7 @@ class BlockReceiver implements java.io.C
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
- receiver.close();
+ BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
@@ -967,13 +983,12 @@ class BlockReceiver implements java.io.C
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientname, offset,
+ inAddr, myAddr, block.getNumBytes(),
+ "HDFS_WRITE", clientname, offset,
dnR.getStorageID(), block, endTime-startTime));
} else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
+ LOG.info("Received block " + block + " of size "
+ + block.getNumBytes() + " from " + inAddr);
}
}
@@ -984,7 +999,8 @@ class BlockReceiver implements java.io.C
replies[0] = SUCCESS;
replies[1] = ERROR;
} else {
- short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
+ : ack.getNumOfReplies();
replies = new Status[1+ackLen];
replies[0] = SUCCESS;
for (int i=0; i<ackLen; i++) {
@@ -994,20 +1010,18 @@ class BlockReceiver implements java.io.C
PipelineAck replyAck = new PipelineAck(expected, replies);
// send my ack back to upstream datanode
- replyAck.write(replyOut);
- replyOut.flush();
+ replyAck.write(upstreamOut);
+ upstreamOut.flush();
if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded an ack: " + replyAck);
+ LOG.debug(myString + ", replyAck=" + replyAck);
}
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
if (replyAck.isSuccess() &&
- pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.offsetInBlock);
}
}
} catch (IOException e) {
@@ -1018,8 +1032,7 @@ class BlockReceiver implements java.io.C
} catch (IOException ioe) {
LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
}
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
+ LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
@@ -1027,15 +1040,13 @@ class BlockReceiver implements java.io.C
}
} catch (Throwable e) {
if (running) {
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
+ LOG.info(myString, e);
running = false;
receiverThread.interrupt();
}
}
}
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
+ LOG.info(myString + " terminating");
}
/**
@@ -1052,15 +1063,23 @@ class BlockReceiver implements java.io.C
/**
* This information is cached by the Datanode in the ackQueue.
*/
- static private class Packet {
- long seqno;
- boolean lastPacketInBlock;
- long lastByteInBlock;
+ private static class Packet {
+ final long seqno;
+ final boolean lastPacketInBlock;
+ final long offsetInBlock;
- Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+ Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
- this.lastByteInBlock = lastByteInPacket;
+ this.offsetInBlock = offsetInBlock;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(seqno=" + seqno
+ + ", lastPacketInBlock=" + lastPacketInBlock
+ + ", offsetInBlock=" + offsetInBlock
+ + ")";
}
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Apr 20 21:00:45 2011
@@ -1907,8 +1907,9 @@ public class DataNode extends Configured
*/
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
final String clientname) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getClass().getSimpleName() + ": " + b
+ if (DataTransferProtocol.LOG.isDebugEnabled()) {
+ DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ + b + " (numBytes=" + b.getNumBytes() + ")"
+ ", stage=" + stage
+ ", clientname=" + clientname
+ ", targests=" + Arrays.asList(targets));
@@ -2573,12 +2574,9 @@ public class DataNode extends Configured
* the stored GS and the visible length.
* @param targets
* @param client
- * @return whether the replica is an RBW
*/
- boolean transferReplicaForPipelineRecovery(final ExtendedBlock b,
+ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final String client) throws IOException {
- checkWriteAccess(b);
-
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2590,7 +2588,8 @@ public class DataNode extends Configured
} else if (data.isValidBlock(b)) {
stage = BlockConstructionStage.TRANSFER_FINALIZED;
} else {
- throw new IOException(b + " is not a RBW or a Finalized");
+ final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
+ throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
}
storedGS = data.getStoredBlock(b.getBlockPoolId(),
@@ -2609,7 +2608,6 @@ public class DataNode extends Configured
if (targets.length > 0) {
new DataTransfer(targets, b, stage, client).run();
}
- return stage == BlockConstructionStage.TRANSFER_RBW;
}
// Determine a Datanode's streaming address
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Apr 20 21:00:45 2011
@@ -154,7 +154,7 @@ class DataXceiver extends DataTransferPr
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
- checkAccess(out, block, blockToken,
+ checkAccess(out, true, block, blockToken,
DataTransferProtocol.Op.READ_BLOCK,
BlockTokenSecretManager.AccessMode.READ);
@@ -258,7 +258,7 @@ class DataXceiver extends DataTransferPr
new BufferedOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
SMALL_BUFFER_SIZE));
- checkAccess(isClient? replyOut: null, block, blockToken,
+ checkAccess(replyOut, isClient, block, blockToken,
DataTransferProtocol.Op.WRITE_BLOCK,
BlockTokenSecretManager.AccessMode.WRITE);
@@ -365,7 +365,7 @@ class DataXceiver extends DataTransferPr
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
+ mirrorAddr, null, targets);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@@ -419,13 +419,14 @@ class DataXceiver extends DataTransferPr
final ExtendedBlock blk, final String client,
final DatanodeInfo[] targets,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
- final DataOutputStream out = new DataOutputStream(
- NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
- checkAccess(out, blk, blockToken,
+ checkAccess(null, true, blk, blockToken,
DataTransferProtocol.Op.TRANSFER_BLOCK,
BlockTokenSecretManager.AccessMode.COPY);
updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+ final DataOutputStream out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, client);
SUCCESS.write(out);
@@ -442,7 +443,7 @@ class DataXceiver extends DataTransferPr
Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
- checkAccess(out, block, blockToken,
+ checkAccess(out, true, block, blockToken,
DataTransferProtocol.Op.BLOCK_CHECKSUM,
BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
@@ -634,7 +635,7 @@ class DataXceiver extends DataTransferPr
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
- dataXceiverServer.balanceThrottler, -1);
+ dataXceiverServer.balanceThrottler, null);
// notify name node
datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -699,7 +700,7 @@ class DataXceiver extends DataTransferPr
}
}
- private void checkAccess(final DataOutputStream out,
+ private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final DataTransferProtocol.Op op,
@@ -709,7 +710,11 @@ class DataXceiver extends DataTransferPr
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
} catch(InvalidToken e) {
try {
- if (out != null) {
+ if (reply) {
+ if (out == null) {
+ out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+ }
ERROR_ACCESS_TOKEN.write(out);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR =
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Apr 20 21:00:45 2011
@@ -2369,6 +2369,12 @@ public class FSDataset implements FSCons
return volumeMap.get(bpid, blockId);
}
+ @Override
+ public synchronized String getReplicaString(String bpid, long blockId) {
+ final Replica r = volumeMap.get(bpid, blockId);
+ return r == null? "null": r.toString();
+ }
+
@Override // FSDatasetInterface
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
RecoveringBlock rBlock) throws IOException {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Apr 20 21:00:45 2011
@@ -107,6 +107,11 @@ public interface FSDatasetInterface exte
public Replica getReplica(String bpid, long blockId);
/**
+ * @return replica meta information
+ */
+ public String getReplicaString(String bpid, long blockId);
+
+ /**
* @return the generation stamp stored with the block.
*/
public Block getStoredBlock(String bpid, long blkid)
Propchange: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -4,4 +4,4 @@
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461