You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/20 23:00:48 UTC

svn commit: r1095512 [1/3] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/ap...

Author: suresh
Date: Wed Apr 20 21:00:45 2011
New Revision: 1095512

URL: http://svn.apache.org/viewvc?rev=1095512&view=rev
Log:
Merging changes r1090113:r1095461 from trunk to federation

Added:
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
      - copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
      - copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/util/
      - copied from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/util/
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
      - copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
      - copied unchanged from r1095461, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
Removed:
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/build.xml   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml
    hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
    hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
    hadoop/hdfs/branches/HDFS-1052/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/hdfs/branches/HDFS-1052/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-1052/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-265:796829-820463
 /hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036738,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036738,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Wed Apr 20 21:00:45 2011
@@ -258,6 +258,16 @@ Trunk (unreleased changes)
     HDFS-1813. Federation: Authentication using BlockToken in RPC to datanode 
                fails. (jitendra)
 
+    HDFS_1630. Support fsedits checksum. (hairong)
+
+    HDFS-1606. Provide a stronger data guarantee in the write pipeline by
+    adding a new datanode when an existing datanode failed.  (szetszwo)
+
+    HDFS-1442. Api to get delegation token in Hdfs class. (jitendra)
+
+    HDFS-1070. Speedup namenode image loading and saving by storing only
+    local file names. (hairong)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -327,6 +337,25 @@ Trunk (unreleased changes)
     HDFS-1767. Namenode ignores non-initial block report from datanodes
     when in safemode during startup. (Matt Foley via suresh)
 
+    HDFS-1817. Move pipeline_Fi_[39-51] from TestFiDataTransferProtocol
+    to TestFiPipelineClose.  (szetszwo)
+
+    HDFS-1760. In FSDirectory.getFullPathName(..), it is better to return "/"
+    for root directory instead of an empty string.  (Daryn Sharp via szetszwo)
+
+    HDFS-1833. Reduce repeated string constructions and unnecessary fields,
+    and fix comments in BlockReceiver.PacketResponder.  (szetszwo)
+
+    HDFS-1486. Generalize CLITest structure and interfaces to faciliate
+    upstream adoption (e.g. for web testing). (cos)
+
+    HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
+    also HADOOP-7230.  (Daryn Sharp via szetszwo)
+
+    HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+    being written are closed for a grace period, and start a new thread when
+    new files are opened for write.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -397,6 +426,18 @@ Trunk (unreleased changes)
     HDFS-1543. Reduce dev. cycle time by moving system testing artifacts from
     default build and push to maven for HDFS (Luke Lu via cos)
 
+    HDFS-1818. TestHDFSCLI is failing on trunk after HADOOP-7202.
+    (Aaron T. Myers via todd)
+
+    HDFS-1828. TestBlocksWithNotEnoughRacks intermittently fails assert.
+    (Matt Foley via eli)
+
+    HDFS-1824. delay instantiation of file system object until it is
+     needed (linked to HADOOP-7207) (boryas)
+
+    HDFS-1831. Fix append bug in FileContext and implement CreateFlag
+    check (related to HADOOP-7223). (suresh)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -853,9 +894,20 @@ Release 0.21.1 - Unreleased
 
     HDFS-1781. Fix the path for jsvc in bin/hdfs.  (John George via szetszwo)
 
-    HDFS-1782. Fix an NPE in RFSNamesystem.startFileInternal(..).
+    HDFS-1782. Fix an NPE in FSNamesystem.startFileInternal(..).
     (John George via szetszwo)
 
+    HDFS-1821. Fix username resolution in NameNode.createSymlink(..) and
+    FSDirectory.addSymlink(..).  (John George via szetszwo)
+
+    HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
+    and likely to fail on fast servers. (Matt Foley via eli)
+
+    HDFS-1845. Symlink comes up as directory after namenode restart.
+    (John George via eli)
+
+    HDFS-1666. Disable failing hdfsproxy test TestAuthorizationFilter (todd)
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.

Propchange: hadoop/hdfs/branches/HDFS-1052/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/build.xml:779102
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
 /hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/build.xml:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461

Propchange: hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/c++/libhdfs:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461

Modified: hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/contrib/build.xml Wed Apr 20 21:00:45 2011
@@ -46,9 +46,11 @@
   <!-- Test all the contribs.                               -->
   <!-- ====================================================== -->
   <target name="test">
+    <!-- hdfsproxy tests failing due to HDFS-1666
     <subant target="test">
       <fileset dir="." includes="hdfsproxy/build.xml"/>
     </subant>
+      -->
   </target>
   
   

Propchange: hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461

Propchange: hadoop/hdfs/branches/HDFS-1052/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/hdfs-default.xml Wed Apr 20 21:00:45 2011
@@ -318,6 +318,42 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
+  <value>ture</value>
+  <description>
+    If there is a datanode/network failure in the write pipeline,
+    DFSClient will try to remove the failed datanode from the pipeline
+    and then continue writing with the remaining datanodes. As a result,
+    the number of datanodes in the pipeline is decreased.  The feature is
+    to add new datanodes to the pipeline.
+
+    This is a site-wise property to enable/disable the feature.
+
+    See also dfs.client.block.write.replace-datanode-on-failure.policy
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
+  <value>DEFAULT</value>
+  <description>
+    This property is used only if the value of
+    dfs.client.block.write.replace-datanode-on-failure.enable is true.
+
+    ALWAYS: always add a new datanode when an existing datanode is removed.
+    
+    NEVER: never add a new datanode.
+
+    DEFAULT: 
+      Let r be the replication number.
+      Let n be the number of existing datanodes.
+      Add a new datanode only if r is greater than or equal to 3 and either
+      (1) floor(r/2) is greater than or equal to n; or
+      (2) r is greater than n and the block is hflushed/appended.
+  </description>
+</property>
+
+<property>
   <name>dfs.blockreport.intervalMsec</name>
   <value>21600000</value>
   <description>Determines block reporting interval in milliseconds.</description>

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/fs/Hdfs.java Wed Apr 20 21:00:45 2011
@@ -25,6 +25,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -37,8 +39,13 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 
 @InterfaceAudience.Private
@@ -249,7 +256,7 @@ public class Hdfs extends AbstractFileSy
       if (hasNext()) {
         return thisListing.getPartialListing()[i++];
       }
-      throw new java.util.NoSuchElementException("No more entry in " + src);
+      throw new NoSuchElementException("No more entry in " + src);
     }
   }
 
@@ -384,4 +391,43 @@ public class Hdfs extends AbstractFileSy
   public Path getLinkTarget(Path p) throws IOException { 
     return new Path(dfs.getLinkTarget(getUriPath(p)));
   }
+  
+  @Override //AbstractFileSystem
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    Token<DelegationTokenIdentifier> result = dfs
+        .getDelegationToken(renewer == null ? null : new Text(renewer));
+    result.setService(new Text(this.getCanonicalServiceName()));
+    List<Token<?>> tokenList = new ArrayList<Token<?>>();
+    tokenList.add(result);
+    return tokenList;
+  }
+
+  /**
+   * Renew an existing delegation token.
+   * 
+   * @param token delegation token obtained earlier
+   * @return the new expiration time
+   * @throws InvalidToken
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public long renewDelegationToken(
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+  }
+
+  /**
+   * Cancel an existing delegation token.
+   * 
+   * @param token delegation token
+   * @throws InvalidToken
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public void cancelDelegationToken(
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Apr 20 21:00:45 2011
@@ -45,6 +45,7 @@ import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -128,15 +129,16 @@ public class DFSClient implements FSCons
   private volatile long serverDefaultsLastUpdate;
   static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   Configuration conf;
   long defaultBlockSize;
   private short defaultReplication;
   SocketFactory socketFactory;
   int socketTimeout;
   final int writePacketSize;
+  final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
+  final LeaseChecker leasechecker;
 
   /**
    * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
@@ -248,8 +250,11 @@ public class DFSClient implements FSCons
     this.writePacketSize = 
       conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                   DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
+    this.leasechecker = new LeaseChecker(hdfsTimeout);
 
     this.ugi = UserGroupInformation.getCurrentUser();
     
@@ -570,8 +575,9 @@ public class DFSClient implements FSCons
                              int buffersize)
       throws IOException {
     return create(src, FsPermission.getDefault(),
-        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), 
-        replication, blockSize, progress, buffersize);
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+        buffersize);
   }
 
   /**
@@ -637,9 +643,29 @@ public class DFSClient implements FSCons
   }
   
   /**
+   * Append to an existing file if {@link CreateFlag#APPEND} is present
+   */
+  private OutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+      int buffersize, Progressable progress) throws IOException {
+    if (flag.contains(CreateFlag.APPEND)) {
+      HdfsFileStatus stat = getFileInfo(src);
+      if (stat == null) { // No file to append to
+        // New file needs to be created if create option is present
+        if (!flag.contains(CreateFlag.CREATE)) {
+          throw new FileNotFoundException("failed to append to non-existent file "
+              + src + " on client " + clientName);
+        }
+        return null;
+      }
+      return callAppend(stat, src, buffersize, progress);
+    }
+    return null;
+  }
+  
+  /**
    * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
    *  Progressable, int)} except that the permission
-   *   is absolute (ie has already been masked with umask.
+   *  is absolute (ie has already been masked with umask.
    */
   public OutputStream primitiveCreate(String src, 
                              FsPermission absPermission,
@@ -652,9 +678,13 @@ public class DFSClient implements FSCons
                              int bytesPerChecksum)
       throws IOException, UnresolvedLinkException {
     checkOpen();
-    OutputStream result = new DFSOutputStream(this, src, absPermission,
-        flag, createParent, replication, blockSize, progress, buffersize,
-        bytesPerChecksum);
+    CreateFlag.validate(flag);
+    OutputStream result = primitiveAppend(src, flag, buffersize, progress);
+    if (result == null) {
+      result = new DFSOutputStream(this, src, absPermission,
+          flag, createParent, replication, blockSize, progress, buffersize,
+          bytesPerChecksum);
+    }
     leasechecker.put(src, result);
     return result;
   }
@@ -696,23 +726,11 @@ public class DFSClient implements FSCons
     }
   }
 
-  /**
-   * Append to an existing HDFS file.  
-   * 
-   * @param src file name
-   * @param buffersize buffer size
-   * @param progress for reporting write-progress
-   * @return an output stream for writing into the file
-   * 
-   * @see ClientProtocol#append(String, String) 
-   */
-  OutputStream append(String src, int buffersize, Progressable progress)
-      throws IOException {
-    checkOpen();
-    HdfsFileStatus stat = null;
+  /** Method to get stream returned by append call */
+  private OutputStream callAppend(HdfsFileStatus stat, String src,
+      int buffersize, Progressable progress) throws IOException {
     LocatedBlock lastBlock = null;
     try {
-      stat = getFileInfo(src);
       lastBlock = namenode.append(src, clientName);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
@@ -722,9 +740,26 @@ public class DFSClient implements FSCons
                                      UnsupportedOperationException.class,
                                      UnresolvedPathException.class);
     }
-    OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
+    return new DFSOutputStream(this, src, buffersize, progress,
         lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
                                      DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+  }
+  
+  /**
+   * Append to an existing HDFS file.  
+   * 
+   * @param src file name
+   * @param buffersize buffer size
+   * @param progress for reporting write-progress
+   * @return an output stream for writing into the file
+   * 
+   * @see ClientProtocol#append(String, String) 
+   */
+  OutputStream append(String src, int buffersize, Progressable progress)
+      throws IOException {
+    checkOpen();
+    HdfsFileStatus stat = getFileInfo(src);
+    OutputStream result = callAppend(stat, src, buffersize, progress);
     leasechecker.put(src, result);
     return result;
   }
@@ -1325,38 +1360,106 @@ public class DFSClient implements FSCons
     }
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
   /** Lease management*/
-  class LeaseChecker implements Runnable {
+  class LeaseChecker {
+    static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+    static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
     /** A map from src -> DFSOutputStream of files that are currently being
      * written by this client.
      */
     private final SortedMap<String, OutputStream> pendingCreates
         = new TreeMap<String, OutputStream>();
+    /** The time in milliseconds that the map became empty. */
+    private long emptyTime = Long.MAX_VALUE;
+    /** A fixed lease renewal time period in milliseconds */
+    private final long renewal;
 
+    /** A daemon for renewing lease */
     private Daemon daemon = null;
-    
+    /** Only the daemon with currentId should run. */
+    private int currentId = 0;
+
+    /** 
+     * A period in milliseconds that the lease renewer thread should run
+     * after the map became empty.
+     * If the map is empty for a time period longer than the grace period,
+     * the renewer should terminate.  
+     */
+    private long gracePeriod;
+    /**
+     * The time period in milliseconds
+     * that the renewer sleeps for each iteration. 
+     */
+    private volatile long sleepPeriod;
+
+    private LeaseChecker(final long timeout) {
+      this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)? 
+          timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+      setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+    }
+
+    /** Set the grace period and adjust the sleep period accordingly. */
+    void setGraceSleepPeriod(final long gracePeriod) {
+      if (gracePeriod < 100L) {
+        throw new HadoopIllegalArgumentException(gracePeriod
+            + " = gracePeriod < 100ms is too small.");
+      }
+      synchronized(this) {
+        this.gracePeriod = gracePeriod;
+      }
+      final long half = gracePeriod/2;
+      this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+          half: LEASE_RENEWER_SLEEP_DEFAULT;
+    }
+
+    /** Is the daemon running? */
+    synchronized boolean isRunning() {
+      return daemon != null && daemon.isAlive();
+    }
+
+    /** Is the empty period longer than the grace period? */  
+    private synchronized boolean isRenewerExpired() {
+      return emptyTime != Long.MAX_VALUE
+          && System.currentTimeMillis() - emptyTime > gracePeriod;
+    }
+
     synchronized void put(String src, OutputStream out) {
       if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
+        if (daemon == null || isRenewerExpired()) {
+          //start a new deamon with a new id.
+          final int id = ++currentId;
+          daemon = new Daemon(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                LeaseChecker.this.run(id);
+              } catch(InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+                      + " is interrupted.", e);
+                }
+              }
+            }
+          });
           daemon.start();
         }
         pendingCreates.put(src, out);
+        emptyTime = Long.MAX_VALUE;
       }
     }
     
     synchronized void remove(String src) {
       pendingCreates.remove(src);
+      if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the map is empty.
+        emptyTime = System.currentTimeMillis();
+      }
     }
     
     void interruptAndJoin() throws InterruptedException {
       Daemon daemonCopy = null;
       synchronized (this) {
-        if (daemon != null) {
+        if (isRunning()) {
           daemon.interrupt();
           daemonCopy = daemon;
         }
@@ -1423,37 +1526,30 @@ public class DFSClient implements FSCons
      * Periodically check in with the namenode and renew all the leases
      * when the lease period is half over.
      */
-    public void run() {
-      long lastRenewed = 0;
-      int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
-      if (hdfsTimeout > 0) {
-        renewal = Math.min(renewal, hdfsTimeout/2);
-      }
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > renewal) {
+    private void run(final int id) throws InterruptedException {
+      for(long lastRenewed = System.currentTimeMillis();
+          clientRunning && !Thread.interrupted();
+          Thread.sleep(sleepPeriod)) {
+        if (System.currentTimeMillis() - lastRenewed >= renewal) {
           try {
             renew();
             lastRenewed = System.currentTimeMillis();
           } catch (SocketTimeoutException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Shutting down HDFS client...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Aborting ...", ie);
             abort();
             break;
           } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Will retry shortly...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
           }
         }
 
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
+        synchronized(this) {
+          if (id != currentId || isRenewerExpired()) {
+            //no longer the current daemon or expired
+            return;
           }
-          return;
         }
       }
     }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Apr 20 21:00:45 2011
@@ -40,6 +40,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
+  public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Apr 20 21:00:45 2011
@@ -31,8 +31,10 @@ import java.net.Socket;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -96,9 +98,6 @@ import org.apache.hadoop.util.StringUtil
  * starts sending packets from the dataQueue.
 ****************************************************************/
 class DFSOutputStream extends FSOutputSummer implements Syncable {
-  /**
-   * 
-   */
   private final DFSClient dfsClient;
   private Configuration conf;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
@@ -295,10 +294,18 @@ class DFSOutputStream extends FSOutputSu
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
 
+    /** Nodes have been used in the pipeline before and have failed. */
+    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+    /** Has the current block been hflushed? */
+    private boolean isHflushed = false;
+    /** Append on an existing block? */
+    private final boolean isAppend;
+
     /**
      * Default construction for file create
      */
     private DataStreamer() {
+      isAppend = false;
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -311,6 +318,7 @@ class DFSOutputStream extends FSOutputSu
      */
     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
+      isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
@@ -750,6 +758,105 @@ class DFSOutputStream extends FSOutputSu
       return doSleep;
     }
 
+    private void setHflush() {
+      isHflushed = true;
+    }
+
+    private int findNewDatanode(final DatanodeInfo[] original
+        ) throws IOException {
+      if (nodes.length != original.length + 1) {
+        throw new IOException("Failed to add a datanode:"
+            + " nodes.length != original.length + 1, nodes="
+            + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+      }
+      for(int i = 0; i < nodes.length; i++) {
+        int j = 0;
+        for(; j < original.length && !nodes[i].equals(original[j]); j++);
+        if (j == original.length) {
+          return i;
+        }
+      }
+      throw new IOException("Failed: new datanode not found: nodes="
+          + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+    }
+
+    private void addDatanode2ExistingPipeline() throws IOException {
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+      }
+      /*
+       * Is data transfer necessary?  We have the following cases.
+       * 
+       * Case 1: Failure in Pipeline Setup
+       * - Append
+       *    + Transfer the stored replica, which may be a RBW or a finalized.
+       * - Create
+       *    + If no data, then no transfer is required.
+       *    + If there are data written, transfer RBW. This case may happens 
+       *      when there are streaming failure earlier in this pipeline.
+       *
+       * Case 2: Failure in Streaming
+       * - Append/Create:
+       *    + transfer RBW
+       * 
+       * Case 3: Failure in Close
+       * - Append/Create:
+       *    + no transfer, let NameNode replicates the block.
+       */
+      if (!isAppend && lastAckedSeqno < 0
+          && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+        //no data have been written
+        return;
+      } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+          || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        //pipeline is closing
+        return;
+      }
+
+      //get a new datanode
+      final DatanodeInfo[] original = nodes;
+      final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+          src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+          1, dfsClient.clientName);
+      nodes = lb.getLocations();
+
+      //find the new datanode
+      final int d = findNewDatanode(original);
+
+      //transfer replica
+      final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+      final DatanodeInfo[] targets = {nodes[d]};
+      transfer(src, targets, lb.getBlockToken());
+    }
+
+    private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+        final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      //transfer replica to the new datanode
+      Socket sock = null;
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        sock = createSocketForPipeline(src, 2, dfsClient);
+        final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+        out = new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(sock, writeTimeout),
+            DataNode.SMALL_BUFFER_SIZE));
+
+        //send the TRANSFER_BLOCK request
+        DataTransferProtocol.Sender.opTransferBlock(out, block,
+            dfsClient.clientName, targets, blockToken);
+
+        //ack
+        in = new DataInputStream(NetUtils.getInputStream(sock));
+        if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+          throw new IOException("Failed to add a datanode");
+        }
+      } finally {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+        IOUtils.closeSocket(sock);
+      }
+    }
 
     /**
      * Open a DataOutputStream to a DataNode pipeline so that 
@@ -793,6 +900,8 @@ class DFSOutputStream extends FSOutputSu
           DFSClient.LOG.warn("Error Recovery for block " + block +
               " in pipeline " + pipelineMsg + 
               ": bad datanode " + nodes[errorIndex].getName());
+          failed.add(nodes[errorIndex]);
+
           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
@@ -803,6 +912,12 @@ class DFSOutputStream extends FSOutputSu
           errorIndex = -1;
         }
 
+        // Check if replace-datanode policy is satisfied.
+        if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
+            nodes, isAppend, isHflushed)) {
+          addDatanode2ExistingPipeline();
+        }
+
         // get a new generation stamp and an access token
         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
@@ -888,7 +1003,7 @@ class DFSOutputStream extends FSOutputSu
 
       boolean result = false;
       try {
-        s = createSocketForPipeline(nodes, dfsClient);
+        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
         long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
 
         //
@@ -1026,18 +1141,19 @@ class DFSOutputStream extends FSOutputSu
 
   /**
    * Create a socket for a write pipeline
-   * @param datanodes the datanodes on the pipeline 
+   * @param first the first datanode 
+   * @param length the pipeline length
    * @param client
    * @return the socket connected to the first datanode
    */
-  static Socket createSocketForPipeline(final DatanodeInfo[] datanodes,
-      final DFSClient client) throws IOException {
+  static Socket createSocketForPipeline(final DatanodeInfo first,
+      final int length, final DFSClient client) throws IOException {
     if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + datanodes[0].getName());
+      DFSClient.LOG.debug("Connecting to datanode " + first.getName());
     }
-    final InetSocketAddress isa = NetUtils.createSocketAddr(datanodes[0].getName());
+    final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
     final Socket sock = client.socketFactory.createSocket();
-    final int timeout = client.getDatanodeReadTimeout(datanodes.length);
+    final int timeout = client.getDatanodeReadTimeout(length);
     NetUtils.connect(sock, isa, timeout);
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
@@ -1363,6 +1479,12 @@ class DFSOutputStream extends FSOutputSu
           throw ioe;
         }
       }
+
+      synchronized(this) {
+        if (streamer != null) {
+          streamer.setHflush();
+        }
+      }
     } catch (InterruptedIOException interrupt) {
       // This kind of error doesn't mean that the stream itself is broken - just the
       // flushing thread got interrupted. So, we shouldn't close down the writer,
@@ -1577,7 +1699,7 @@ class DFSOutputStream extends FSOutputSu
   /**
    * Returns the access token currently used by streamer, for testing only
    */
-  Token<BlockTokenIdentifier> getBlockToken() {
+  synchronized Token<BlockTokenIdentifier> getBlockToken() {
     return streamer.getBlockToken();
   }
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Apr 20 21:00:45 2011
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -242,9 +243,9 @@ public class DistributedFileSystem exten
     Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
     return new FSDataOutputStream(dfs.create(getPathName(f), permission,
-        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
-        replication, blockSize, progress, bufferSize),
-        statistics);
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+        bufferSize), statistics);
   }
   
   @SuppressWarnings("deprecation")
@@ -266,6 +267,9 @@ public class DistributedFileSystem exten
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    if (flag.contains(CreateFlag.OVERWRITE)) {
+      flag.add(CreateFlag.CREATE);
+    }
     return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
         false, replication, blockSize, progress, bufferSize), statistics);
   }
@@ -810,6 +814,14 @@ public class DistributedFileSystem exten
       throws IOException {
     return dfs.getDelegationToken(renewer);
   }
+  
+  @Override // FileSystem
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    List<Token<?>> tokenList = new ArrayList<Token<?>>();
+    Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
+    tokenList.add(token);
+    return tokenList;
+  }
 
   /**
    * Renew an existing delegation token.

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Apr 20 21:00:45 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 66: Add block pool ID to Block
+   * 67: Add block pool ID to Block
    */
-  public static final long versionID = 66L;
+  public static final long versionID = 67L;
   
   ///////////////////////////////////////
   // File contents
@@ -298,6 +298,30 @@ public interface ClientProtocol extends 
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;
 
+  /** 
+   * Get a datanode for an existing pipeline.
+   * 
+   * @param src the file being written
+   * @param blk the block being written
+   * @param existings the existing nodes in the pipeline
+   * @param excludes the excluded nodes
+   * @param numAdditionalNodes number of additional datanodes
+   * @param clientName the name of the client
+   * 
+   * @return the located block.
+   * 
+   * @throws AccessControlException If access is denied
+   * @throws FileNotFoundException If file <code>src</code> is not found
+   * @throws SafeModeException create not allowed in safemode
+   * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws IOException If an I/O error occurred
+   */
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws AccessControlException, FileNotFoundException,
+          SafeModeException, UnresolvedLinkException, IOException;
+
   /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Apr 20 21:00:45 2011
@@ -25,8 +25,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -39,18 +44,18 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface DataTransferProtocol {
-  
+  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
   
   /** Version for data transfers between clients and datanodes
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 22:
+   * Version 23:
    *    Changed the protocol methods to use ExtendedBlock instead
    *    of Block.
    */
-  public static final int DATA_TRANSFER_VERSION = 21;
+  public static final int DATA_TRANSFER_VERSION = 23;
 
   /** Operation */
   public enum Op {
@@ -750,4 +755,93 @@ public interface DataTransferProtocol {
     }
   }
 
+  /**
+   * The setting of replace-datanode-on-failure feature.
+   */
+  public enum ReplaceDatanodeOnFailure {
+    /** The feature is disabled in the entire site. */
+    DISABLE,
+    /** Never add a new datanode. */
+    NEVER,
+    /**
+     * DEFAULT policy:
+     *   Let r be the replication number.
+     *   Let n be the number of existing datanodes.
+     *   Add a new datanode only if r >= 3 and either
+     *   (1) floor(r/2) >= n; or
+     *   (2) r > n and the block is hflushed/appended.
+     */
+    DEFAULT,
+    /** Always add a new datanode when an existing datanode is removed. */
+    ALWAYS;
+
+    /** Check if the feature is enabled. */
+    public void checkEnabled() {
+      if (this == DISABLE) {
+        throw new UnsupportedOperationException(
+            "This feature is disabled.  Please refer to "
+            + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+            + " configuration property.");
+      }
+    }
+
+    /** Is the policy satisfied? */
+    public boolean satisfy(
+        final short replication, final DatanodeInfo[] existings,
+        final boolean isAppend, final boolean isHflushed) {
+      final int n = existings == null? 0: existings.length;
+      if (n == 0 || n >= replication) {
+        //don't need to add datanode for any policy.
+        return false;
+      } else if (this == DISABLE || this == NEVER) {
+        return false;
+      } else if (this == ALWAYS) {
+        return true;
+      } else {
+        //DEFAULT
+        if (replication < 3) {
+          return false;
+        } else {
+          if (n <= (replication/2)) {
+            return true;
+          } else {
+            return isAppend || isHflushed;
+          }
+        }
+      }
+    }
+
+    /** Get the setting from configuration. */
+    public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+      final boolean enabled = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+      if (!enabled) {
+        return DISABLE;
+      }
+
+      final String policy = conf.get(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+      for(int i = 1; i < values().length; i++) {
+        final ReplaceDatanodeOnFailure rdof = values()[i];
+        if (rdof.name().equalsIgnoreCase(policy)) {
+          return rdof;
+        }
+      }
+      throw new HadoopIllegalArgumentException("Illegal configuration value for "
+          + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+          + ": " + policy);
+    }
+
+    /** Write the setting to configuration. */
+    public void write(final Configuration conf) {
+      conf.setBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          this != DISABLE);
+      conf.set(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          name());
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Wed Apr 20 21:00:45 2011
@@ -88,7 +88,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -29;
+  public static final int LAYOUT_VERSION = -31;
   // Current version: 
-  // -29: Adding support for block pools and multiple namenodes
+  // -31: Adding support for block pools and multiple namenodes
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Apr 20 21:00:45 2011
@@ -79,7 +79,7 @@ public abstract class Storage extends St
   public static final int PRE_RBW_LAYOUT_VERSION = -19;
   
   // last layout version that is before federation
-  public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -28;
+  public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30;
   
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Apr 20 21:00:45 2011
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.pro
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -56,7 +57,7 @@ import org.apache.hadoop.util.StringUtil
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  **/
-class BlockReceiver implements java.io.Closeable, FSConstants {
+class BlockReceiver implements Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -652,7 +653,7 @@ class BlockReceiver implements java.io.C
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      int numTargets) throws IOException {
+      DatanodeInfo[] downstreams) throws IOException {
 
       boolean responderClosed = false;
       mirrorOut = mirrOut;
@@ -662,9 +663,8 @@ class BlockReceiver implements java.io.C
     try {
       if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 
-            new PacketResponder(this, block, mirrIn, replyOut, 
-                                numTargets, Thread.currentThread()));
-        responder.start(); // start thread to processes reponses
+            new PacketResponder(replyOut, mirrIn, downstreams));
+        responder.start(); // start thread to processes responses
       }
 
       /* 
@@ -700,8 +700,7 @@ class BlockReceiver implements java.io.C
       }
 
     } catch (IOException ioe) {
-      LOG.info("Exception in receiveBlock for block " + block + 
-               " " + ioe);
+      LOG.info("Exception in receiveBlock for " + block, ioe);
       throw ioe;
     } finally {
       if (!responderClosed) { // Abnormal termination of the flow above
@@ -808,51 +807,71 @@ class BlockReceiver implements java.io.C
     }
   }
   
+  private static enum PacketResponderType {
+    NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
+  }
   
   /**
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
-  class PacketResponder implements Runnable, FSConstants {   
+  class PacketResponder implements Runnable, Closeable, FSConstants {   
 
-    //packet waiting for ack
-    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** queue for packets waiting for ack */
+    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** the thread that spawns this responder */
+    private final Thread receiverThread = Thread.currentThread();
+    /** is this responder running? */
     private volatile boolean running = true;
-    private ExtendedBlock block;
-    DataInputStream mirrorIn;   // input from downstream datanode
-    DataOutputStream replyOut;  // output to upstream datanode
-    private int numTargets;     // number of downstream datanodes including myself
-    private BlockReceiver receiver; // The owner of this responder.
-    private Thread receiverThread; // the thread that spawns this responder
 
+    /** input from the next downstream datanode */
+    private final DataInputStream downstreamIn;
+    /** output to upstream datanode/client */
+    private final DataOutputStream upstreamOut;
+
+    /** The type of this responder */
+    private final PacketResponderType type;
+    /** for log and error messages */
+    private final String myString; 
+
+    @Override
     public String toString() {
-      return "PacketResponder " + numTargets + " for Block " + this.block;
+      return myString;
     }
 
-    PacketResponder(BlockReceiver receiver, ExtendedBlock b, DataInputStream in, 
-                    DataOutputStream out, int numTargets,
-                    Thread receiverThread) {
-      this.receiverThread = receiverThread;
-      this.receiver = receiver;
-      this.block = b;
-      mirrorIn = in;
-      replyOut = out;
-      this.numTargets = numTargets;
+    PacketResponder(final DataOutputStream upstreamOut,
+        final DataInputStream downstreamIn,
+        final DatanodeInfo[] downstreams) {
+      this.downstreamIn = downstreamIn;
+      this.upstreamOut = upstreamOut;
+
+      this.type = downstreams == null? PacketResponderType.NON_PIPELINE
+          : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
+              : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
+
+      final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+          .append(": ").append(block).append(", type=").append(type);
+      if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+        b.append(", downstreams=").append(downstreams.length)
+            .append(":").append(Arrays.asList(downstreams));
+      }
+      this.myString = b.toString();
     }
 
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
-     * @param lastByteInPacket
+     * @param offsetInBlock
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    synchronized void enqueue(final long seqno,
+        final boolean lastPacketInBlock, final long offsetInBlock) {
       if (running) {
+        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
         if(LOG.isDebugEnabled()) {
-          LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
-                    " to ack queue.");
+          LOG.debug(myString + ": enqueue " + p);
         }
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
+        ackQueue.addLast(p);
         notifyAll();
       }
     }
@@ -860,7 +879,8 @@ class BlockReceiver implements java.io.C
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
-    synchronized void close() {
+    @Override
+    public synchronized void close() {
       while (running && ackQueue.size() != 0 && datanode.shouldRun) {
         try {
           wait();
@@ -869,8 +889,7 @@ class BlockReceiver implements java.io.C
         }
       }
       if(LOG.isDebugEnabled()) {
-        LOG.debug("PacketResponder " + numTargets +
-                 " for block " + block + " Closing down.");
+        LOG.debug(myString + ": closing");
       }
       running = false;
       notifyAll();
@@ -892,21 +911,21 @@ class BlockReceiver implements java.io.C
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
             try {
-              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+              if (type != PacketResponderType.LAST_IN_PIPELINE
+                  && !mirrorError) {
                 // read an ack from downstream datanode
-                ack.readFields(mirrorIn);
+                ack.readFields(downstreamIn);
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                  LOG.debug(myString + " got " + ack);
                 }
                 seqno = ack.getSeqno();
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO
+                  || type == PacketResponderType.LAST_IN_PIPELINE) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
-                      LOG.debug("PacketResponder " + numTargets + 
-                                " seqno = " + seqno +
-                                " for block " + block +
+                      LOG.debug(myString + ": seqno=" + seqno +
                                 " waiting for local datanode to finish write.");
                     }
                     wait();
@@ -916,11 +935,10 @@ class BlockReceiver implements java.io.C
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (numTargets > 0 && seqno != expected) {
-                    throw new IOException("PacketResponder " + numTargets +
-                                          " for block " + block +
-                                          " expected seqno:" + expected +
-                                          " received:" + seqno);
+                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+                      && seqno != expected) {
+                    throw new IOException(myString + "seqno: expected="
+                        + expected + ", received=" + seqno);
                   }
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
@@ -935,8 +953,7 @@ class BlockReceiver implements java.io.C
                 // notify client of the error
                 // and wait for the client to shut down the pipeline
                 mirrorError = true;
-                LOG.info("PacketResponder " + block + " " + numTargets + 
-                      " Exception " + StringUtils.stringifyException(ioe));
+                LOG.info(myString, ioe);
               }
             }
 
@@ -948,8 +965,7 @@ class BlockReceiver implements java.io.C
                * because this datanode has a problem. The upstream datanode
                * will detect that this datanode is bad, and rightly so.
                */
-              LOG.info("PacketResponder " + block +  " " + numTargets +
-                       " : Thread is interrupted.");
+              LOG.info(myString + ": Thread is interrupted.");
               running = false;
               continue;
             }
@@ -957,7 +973,7 @@ class BlockReceiver implements java.io.C
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
-              receiver.close();
+              BlockReceiver.this.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
@@ -967,13 +983,12 @@ class BlockReceiver implements java.io.C
                 DatanodeRegistration dnR = 
                   datanode.getDNRegistrationForBP(block.getBlockPoolId());
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", receiver.clientname, offset,
+                      inAddr, myAddr, block.getNumBytes(),
+                      "HDFS_WRITE", clientname, offset,
                       dnR.getStorageID(), block, endTime-startTime));
               } else {
-                LOG.info("Received block " + block + 
-                         " of size " + block.getNumBytes() + 
-                         " from " + receiver.inAddr);
+                LOG.info("Received block " + block + " of size "
+                    + block.getNumBytes() + " from " + inAddr);
               }
             }
 
@@ -984,7 +999,8 @@ class BlockReceiver implements java.io.C
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
+                  : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
               for (int i=0; i<ackLen; i++) {
@@ -994,20 +1010,18 @@ class BlockReceiver implements java.io.C
             PipelineAck replyAck = new PipelineAck(expected, replies);
             
             // send my ack back to upstream datanode
-            replyAck.write(replyOut);
-            replyOut.flush();
+            replyAck.write(upstreamOut);
+            upstreamOut.flush();
             if (LOG.isDebugEnabled()) {
-              LOG.debug("PacketResponder " + numTargets + 
-                        " for block " + block +
-                        " responded an ack: " + replyAck);
+              LOG.debug(myString + ", replyAck=" + replyAck);
             }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
               if (replyAck.isSuccess() && 
-                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+                  pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.offsetInBlock);
               }
             }
         } catch (IOException e) {
@@ -1018,8 +1032,7 @@ class BlockReceiver implements java.io.C
             } catch (IOException ioe) {
               LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
             }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption
               receiverThread.interrupt();
@@ -1027,15 +1040,13 @@ class BlockReceiver implements java.io.C
           }
         } catch (Throwable e) {
           if (running) {
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             receiverThread.interrupt();
           }
         }
       }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
+      LOG.info(myString + " terminating");
     }
     
     /**
@@ -1052,15 +1063,23 @@ class BlockReceiver implements java.io.C
   /**
    * This information is cached by the Datanode in the ackQueue.
    */
-  static private class Packet {
-    long seqno;
-    boolean lastPacketInBlock;
-    long lastByteInBlock;
+  private static class Packet {
+    final long seqno;
+    final boolean lastPacketInBlock;
+    final long offsetInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
-      this.lastByteInBlock = lastByteInPacket;
+      this.offsetInBlock = offsetInBlock;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(seqno=" + seqno
+        + ", lastPacketInBlock=" + lastPacketInBlock
+        + ", offsetInBlock=" + offsetInBlock
+        + ")";
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Apr 20 21:00:45 2011
@@ -1907,8 +1907,9 @@ public class DataNode extends Configured
      */
     DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
         final String clientname) throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getClass().getSimpleName() + ": " + b
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+            + b + " (numBytes=" + b.getNumBytes() + ")"
             + ", stage=" + stage
             + ", clientname=" + clientname
             + ", targests=" + Arrays.asList(targets));
@@ -2573,12 +2574,9 @@ public class DataNode extends Configured
    *          the stored GS and the visible length. 
    * @param targets
    * @param client
-   * @return whether the replica is an RBW
    */
-  boolean transferReplicaForPipelineRecovery(final ExtendedBlock b,
+  void transferReplicaForPipelineRecovery(final ExtendedBlock b,
       final DatanodeInfo[] targets, final String client) throws IOException {
-    checkWriteAccess(b);
-
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2590,7 +2588,8 @@ public class DataNode extends Configured
       } else if (data.isValidBlock(b)) {
         stage = BlockConstructionStage.TRANSFER_FINALIZED;
       } else {
-        throw new IOException(b + " is not a RBW or a Finalized");
+        final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
+        throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
       }
 
       storedGS = data.getStoredBlock(b.getBlockPoolId(),
@@ -2609,7 +2608,6 @@ public class DataNode extends Configured
     if (targets.length > 0) {
       new DataTransfer(targets, b, stage, client).run();
     }
-    return stage == BlockConstructionStage.TRANSFER_RBW;
   }
 
   // Determine a Datanode's streaming address

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Apr 20 21:00:45 2011
@@ -154,7 +154,7 @@ class DataXceiver extends DataTransferPr
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-    checkAccess(out, block, blockToken,
+    checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.READ_BLOCK,
         BlockTokenSecretManager.AccessMode.READ);
   
@@ -258,7 +258,7 @@ class DataXceiver extends DataTransferPr
         new BufferedOutputStream(
             NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
             SMALL_BUFFER_SIZE));
-    checkAccess(isClient? replyOut: null, block, blockToken,
+    checkAccess(replyOut, isClient, block, blockToken,
         DataTransferProtocol.Op.WRITE_BLOCK,
         BlockTokenSecretManager.AccessMode.WRITE);
 
@@ -365,7 +365,7 @@ class DataXceiver extends DataTransferPr
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets.length);
+            mirrorAddr, null, targets);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -419,13 +419,14 @@ class DataXceiver extends DataTransferPr
       final ExtendedBlock blk, final String client,
       final DatanodeInfo[] targets,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, blk, blockToken,
+    checkAccess(null, true, blk, blockToken,
         DataTransferProtocol.Op.TRANSFER_BLOCK,
         BlockTokenSecretManager.AccessMode.COPY);
 
     updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+    final DataOutputStream out = new DataOutputStream(
+        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, client);
       SUCCESS.write(out);
@@ -442,7 +443,7 @@ class DataXceiver extends DataTransferPr
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, block, blockToken,
+    checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.BLOCK_CHECKSUM,
         BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
@@ -634,7 +635,7 @@ class DataXceiver extends DataTransferPr
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, -1);
+          dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -699,7 +700,7 @@ class DataXceiver extends DataTransferPr
     }
   }
 
-  private void checkAccess(final DataOutputStream out, 
+  private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final DataTransferProtocol.Op op,
@@ -709,7 +710,11 @@ class DataXceiver extends DataTransferPr
         datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
       } catch(InvalidToken e) {
         try {
-          if (out != null) {
+          if (reply) {
+            if (out == null) {
+              out = new DataOutputStream(
+                  NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+            }
             ERROR_ACCESS_TOKEN.write(out);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
               DatanodeRegistration dnR = 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Apr 20 21:00:45 2011
@@ -2369,6 +2369,12 @@ public class FSDataset implements FSCons
     return volumeMap.get(bpid, blockId);
   }
 
+  @Override 
+  public synchronized String getReplicaString(String bpid, long blockId) {
+    final Replica r = volumeMap.get(bpid, blockId);
+    return r == null? "null": r.toString();
+  }
+
   @Override // FSDatasetInterface
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Apr 20 21:00:45 2011
@@ -107,6 +107,11 @@ public interface FSDatasetInterface exte
   public Replica getReplica(String bpid, long blockId);
 
   /**
+   * @return replica meta information
+   */
+  public String getReplicaString(String bpid, long blockId);
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(String bpid, long blkid)

Propchange: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -4,4 +4,4 @@
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461