You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 00:57:34 UTC
svn commit: r820487 [1/6] - in /hadoop/hdfs/branches/branch-0.21: ./
.eclipse.templates/.launches/ lib/ src/contrib/block_forensics/
src/contrib/block_forensics/client/ src/contrib/block_forensics/ivy/
src/contrib/block_forensics/src/ src/contrib/block...
Author: hairong
Date: Wed Sep 30 22:57:30 2009
New Revision: 820487
URL: http://svn.apache.org/viewvc?rev=820487&view=rev
Log:
HDFS-265. Merge -r 796828:HEAD --accept theirs-full https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265
Added:
hadoop/hdfs/branches/branch-0.21/lib/hadoop-core-0.21.0-dev.jar
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/lib/hadoop-core-0.21.0-dev.jar
hadoop/hdfs/branches/branch-0.21/lib/hadoop-core-test-0.21.0-dev.jar
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/lib/hadoop-core-test-0.21.0-dev.jar
hadoop/hdfs/branches/branch-0.21/lib/hadoop-mapred-0.21.0-dev.jar
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/lib/hadoop-mapred-0.21.0-dev.jar
hadoop/hdfs/branches/branch-0.21/lib/hadoop-mapred-examples-0.21.0-dev.jar
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/lib/hadoop-mapred-examples-0.21.0-dev.jar
hadoop/hdfs/branches/branch-0.21/lib/hadoop-mapred-test-0.21.0-dev.jar
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/lib/hadoop-mapred-test-0.21.0-dev.jar
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
- copied unchanged from r820463, hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
Removed:
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java
Modified:
hadoop/hdfs/branches/branch-0.21/ (props changed)
hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/ (props changed)
hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/AllTests.launch (props changed)
hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/DataNode.launch (props changed)
hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/NameNode.launch (props changed)
hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/SpecificTestTemplate.launch (props changed)
hadoop/hdfs/branches/branch-0.21/CHANGES.txt
hadoop/hdfs/branches/branch-0.21/build.xml (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/build.xml (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/client/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/client/BlockForensics.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ivy/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ivy.xml (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ivy/libraries.properties (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/hadoop/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/hadoop/block_forensics/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/hadoop/block_forensics/BlockSearch.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml
hadoop/hdfs/branches/branch-0.21/src/java/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/BlockMissingException.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (contents, props changed)
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj
hadoop/hdfs/branches/branch-0.21/src/test/findbugsExcludeFile.xml
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs-with-mr/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java (contents, props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java (contents, props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/branch-0.21/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Sep 30 22:57:30 2009
@@ -3,6 +3,5 @@
logs
.classpath
.externalToolBuilders
-.launches
.project
.settings
Propchange: hadoop/hdfs/branches/branch-0.21/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 22:57:30 2009
@@ -1,2 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
+/hadoop/hdfs/branches/HDFS-265:796829-820463
/hadoop/hdfs/trunk:818294-818298
Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/AllTests.launch
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/DataNode.launch
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/NameNode.launch
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/SpecificTestTemplate.launch
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Wed Sep 30 22:57:30 2009
@@ -11,6 +11,21 @@
HDFS-602. DistributedFileSystem mkdirs throws FileAlreadyExistsException
instead of FileNotFoundException. (Boris Shkolnik via suresh)
+ HDFS-544. Add a "rbw" subdir to DataNode data directory. (hairong)
+
+ HDFS-576. Block report includes under-construction replicas. (shv)
+
+ HDFS-636. SafeMode counts complete blocks only. (shv)
+
+ HDFS-644. Lease recovery, concurrency support. (shv)
+
+ HDFS-570. Get last block length from a data-node when opening a file
+ being written to. (Tsz Wo (Nicholas), SZE via shv)
+
+ HDFS-657. Remove unused legacy data-node protocol methods. (shv)
+
+ HDFS-658. Block recovery for primary data-node. (shv)
+
NEW FEATURES
HDFS-436. Introduce AspectJ framework for HDFS code and tests.
@@ -49,6 +64,37 @@
HDFS-610. Support o.a.h.fs.FileContext. (Sanjay Radia via szetszwo)
+ HDFS-536. Support hflush at DFSClient. (hairong)
+
+ HDFS-517. Introduce BlockInfoUnderConstruction to reflect block replica
+ states while writing. (shv)
+
+ HDFS-565. Introduce block committing logic during new block allocation
+ and file close. (shv)
+
+ HDFS-537. DataNode exposes a replica's meta info to BlockReceiver for the
+ support of dfs writes/hflush. It also updates a replica's bytes received,
+ bytes on disk, and bytes acked after receiving a packet. (hairong)
+
+ HDFS-585. Datanode should serve up to visible length of a replica for read
+ requests. (szetszwo)
+
+ HDFS-604. Block report processing for append. (shv)
+
+ HDFS-619. Support replica recovery initialization in datanode for the new
+ append design. (szetszwo)
+
+ HDFS-592. Allow clients to fetch a new generation stamp from NameNode for
+ pipeline recovery. (hairong)
+
+ HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup
+ for append. (hairong)
+
+ HDFS-627. Support replica update in data-node.
+ (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
+
+ HDFS-642. Support pipeline close and close error recovery. (hairong)
+
IMPROVEMENTS
HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -173,6 +219,31 @@
HDFS-598. Eclipse launch task for HDFS. (Eli Collins via tomwhite)
+ HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
+ (hairong)
+
+ HDFS-562. Add a test for NameNode.getBlockLocations(..) to check read from
+ un-closed file. (szetszwo)
+
+ HDFS-543. Break FSDatasetInterface#writToBlock() into writeToRemporary,
+ writeToRBW, ad append. (hairong)
+
+ HDFS-603. Add a new interface, Replica, which is going to replace the use
+ of Block in datanode. (szetszwo)
+
+ HDFS-589. Change block write protocol to support pipeline recovery.
+ (hairong)
+
+ HDFS-652. Replace BlockInfo.isUnderConstruction() with isComplete() (shv)
+
+ HDFS-648. Change some methods in AppendTestUtil to public. (Konstantin
+ Boudnik via szetszwo)
+
+ HDFS-662. Unnecessary info message from DFSClient. (hairong)
+
+ HDFS-518. Create new tests for Append's hflush. (Konstantin Boudnik
+ via szetszwo)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
@@ -270,6 +341,28 @@
HDFS-637. DataNode sends a Success ack when block write fails. (hairong)
+ HDFS-547. TestHDFSFileSystemContract#testOutputStreamClosedTwice
+ sometimes fails with CloseByInterruptException. (hairong)
+
+ HDFS-588. Fix TestFiDataTransferProtocol and TestAppend2 failures. (shv)
+
+ HDFS-550. DataNode restarts may introduce corrupt/duplicated/lost replicas
+ when handling detached replicas. (hairong)
+
+ HDFS-659. If the the last block is not complete, update its length with
+ one of its replica's length stored in datanode. (szetszwo)
+
+ HDFS-649. Check null pointers for DataTransferTest. (Konstantin Boudnik
+ via szetszwo)
+
+ HDFS-661. DataNode upgrade fails on non-existant current directory.
+ (hairong)
+
+ HDFS-597. Mofication introduced by HDFS-537 breakes an advice binding in
+ FSDatasetAspects. (Konstantin Boudnik via szetszwo)
+
+ HDFS-665. TestFileAppend2 sometimes hangs. (hairong)
+
Release 0.20.1 - 2009-09-01
IMPROVEMENTS
Propchange: hadoop/hdfs/branches/branch-0.21/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 22:57:30 2009
@@ -1,3 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
/hadoop/core/trunk/build.xml:779102
+/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
/hadoop/hdfs/trunk/build.xml:818294-818298
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/build.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/client/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/client/BlockForensics.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ivy/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ivy.xml
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/ivy/libraries.properties
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/hadoop/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/hadoop/block_forensics/
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/block_forensics/src/java/org/apache/hadoop/block_forensics/BlockSearch.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 22:57:30 2009
@@ -1,3 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
+/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
/hadoop/hdfs/trunk/src/contrib/hdfsproxy:818294-818298
Modified: hadoop/hdfs/branches/branch-0.21/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml (original)
+++ hadoop/hdfs/branches/branch-0.21/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml Wed Sep 30 22:57:30 2009
@@ -219,7 +219,7 @@
PermString = rwxr-xr-x
-â¦remaining output omittedâ¦
+���remaining output omitted���
</source>
</section> <!-- example-->
Propchange: hadoop/hdfs/branches/branch-0.21/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 22:57:30 2009
@@ -1,3 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
+/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
/hadoop/hdfs/trunk/src/java:818294-818298
Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/BlockMissingException.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Sep 30 22:57:30 2009
@@ -36,8 +36,6 @@
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -86,6 +84,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -1432,8 +1431,8 @@
int dataLen = in.readInt();
// Sanity check the lengths
- if ( dataLen < 0 ||
- ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+ if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
+ ( dataLen != 0 && lastPacketInBlock) ||
(seqno != (lastSeqNo + 1)) ) {
throw new IOException("BlockReader: error in packet header" +
"(chunkOffset : " + chunkOffset +
@@ -1612,6 +1611,7 @@
private BlockReader blockReader = null;
private boolean verifyChecksum;
private LocatedBlocks locatedBlocks = null;
+ private long lastBlockBeingWrittenLength = 0;
private DatanodeInfo currentNode = null;
private Block currentBlock = null;
private long pos = 0;
@@ -1644,6 +1644,9 @@
*/
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("newInfo = " + newInfo);
+ }
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
@@ -1658,11 +1661,46 @@
}
}
this.locatedBlocks = newInfo;
+ this.lastBlockBeingWrittenLength = 0;
+ if (!locatedBlocks.isLastBlockComplete()) {
+ final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+ if (last != null) {
+ final long len = readBlockLength(last);
+ last.getBlock().setNumBytes(len);
+ this.lastBlockBeingWrittenLength = len;
+ }
+ }
+
this.currentNode = null;
}
+
+ /** Read the block length from one of the datanodes. */
+ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+ if (locatedblock == null || locatedblock.getLocations().length == 0) {
+ return 0;
+ }
+ for(DatanodeInfo datanode : locatedblock.getLocations()) {
+ try {
+ final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
+ datanode, conf);
+ final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+ if (n >= 0) {
+ return n;
+ }
+ }
+ catch(IOException ioe) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Faild to getReplicaVisibleLength from datanode "
+ + datanode + " for block " + locatedblock.getBlock(), ioe);
+ }
+ }
+ }
+ throw new IOException("Cannot obtain block length for " + locatedblock);
+ }
public synchronized long getFileLength() {
- return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
+ return locatedBlocks == null? 0:
+ locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
}
/**
@@ -1698,17 +1736,36 @@
private synchronized LocatedBlock getBlockAt(long offset,
boolean updatePosition) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
- // search cached blocks first
- int targetBlockIdx = locatedBlocks.findBlock(offset);
- if (targetBlockIdx < 0) { // block is not cached
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
- // fetch more blocks
- LocatedBlocks newBlocks;
- newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
- assert (newBlocks != null) : "Could not find target position " + offset;
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+
+ final LocatedBlock blk;
+
+ //check offset
+ if (offset < 0 || offset >= getFileLength()) {
+ throw new IOException("offset < 0 || offset > getFileLength(), offset="
+ + offset
+ + ", updatePosition=" + updatePosition
+ + ", locatedBlocks=" + locatedBlocks);
+ }
+ else if (offset >= locatedBlocks.getFileLength()) {
+ // offset to the portion of the last block,
+ // which is not known to the name-node yet;
+ // getting the last block
+ blk = locatedBlocks.getLastLocatedBlock();
+ }
+ else {
+ // search cached blocks first
+ int targetBlockIdx = locatedBlocks.findBlock(offset);
+ if (targetBlockIdx < 0) { // block is not cached
+ targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ // fetch more blocks
+ LocatedBlocks newBlocks;
+ newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+ assert (newBlocks != null) : "Could not find target position " + offset;
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ }
+ blk = locatedBlocks.get(targetBlockIdx);
}
- LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
+
// update current position
if (updatePosition) {
this.pos = offset;
@@ -1745,6 +1802,27 @@
private synchronized List<LocatedBlock> getBlockRange(long offset,
long length)
throws IOException {
+ final List<LocatedBlock> blocks;
+ if (locatedBlocks.isLastBlockComplete()) {
+ blocks = getFinalizedBlockRange(offset, length);
+ }
+ else {
+ if (length + offset > locatedBlocks.getFileLength()) {
+ length = locatedBlocks.getFileLength() - offset;
+ }
+ blocks = getFinalizedBlockRange(offset, length);
+ blocks.add(locatedBlocks.getLastLocatedBlock());
+ }
+ return blocks;
+ }
+
+ /**
+ * Get blocks in the specified range.
+ * Includes only the complete blocks.
+ * Fetch them from the namenode if not cached.
+ */
+ private synchronized List<LocatedBlock> getFinalizedBlockRange(
+ long offset, long length) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
// search cached blocks first
@@ -2312,7 +2390,7 @@
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private Packet currentPacket = null;
- private DataStreamer streamer = new DataStreamer();
+ private DataStreamer streamer;
private long currentSeqno = 0;
private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, including the header.
@@ -2419,6 +2497,18 @@
buffer.reset();
return buffer;
}
+
+ // get the packet's last byte's offset in the block
+ long getLastByteOffsetBlock() {
+ return offsetInBlock + dataPos - dataStart;
+ }
+
+ public String toString() {
+ return "packet seqno:" + this.seqno +
+ " offsetInBlock:" + this.offsetInBlock +
+ " lastPacketInBlock:" + this.lastPacketInBlock +
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+ }
}
//
@@ -2430,18 +2520,101 @@
// if them are received, the DataStreamer closes the current block.
//
class DataStreamer extends Daemon {
- private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
- private int recoveryErrorCount = 0; // number of times block recovery failed
private volatile boolean streamerClosed = false;
- private Block block;
+ private Block block; // its length is number of bytes acked
private AccessToken accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
volatile boolean hasError = false;
- volatile int errorIndex = 0;
-
+ volatile int errorIndex = -1;
+ private BlockConstructionStage stage; // block construction stage
+ private long bytesSent = 0; // number of bytes that've been sent
+
+ /**
+ * Default construction for file create
+ */
+ private DataStreamer() {
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
+ /**
+ * Construct a data streamer for append
+ * @param lastBlock last block of the file to be appended
+ * @param stat status of the file to be appended
+ * @param bytesPerChecksum number of bytes per checksum
+ * @throws IOException if error occurs
+ */
+ private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+ int bytesPerChecksum) throws IOException {
+ stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+ block = lastBlock.getBlock();
+ bytesSent = block.getNumBytes();
+ accessToken = lastBlock.getAccessToken();
+ long usedInLastBlock = stat.getLen() % blockSize;
+ int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+ // calculate the amount of free space in the pre-existing
+ // last crc chunk
+ int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+ int freeInCksum = bytesPerChecksum - usedInCksum;
+
+ // if there is space in the last block, then we have to
+ // append to that block
+ if (freeInLastBlock == blockSize) {
+ throw new IOException("The last block for file " +
+ src + " is full.");
+ }
+
+ if (usedInCksum > 0 && freeInCksum > 0) {
+ // if there is space in the last partial chunk, then
+ // setup in such a way that the next packet will have only
+ // one chunk that fills up the partial chunk.
+ //
+ computePacketChunkSize(0, freeInCksum);
+ resetChecksumChunk(freeInCksum);
+ appendChunk = true;
+ } else {
+ // if the remaining space in the block is smaller than
+ // that expected size of of a packet, then create
+ // smaller size packet.
+ //
+ computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock),
+ bytesPerChecksum);
+ }
+
+ // setup pipeline to append to the last block XXX retries??
+ nodes = lastBlock.getLocations();
+ errorIndex = -1; // no errors yet.
+ if (nodes.length < 1) {
+ throw new IOException("Unable to retrieve blocks locations " +
+ " for last block " + block +
+ "of file " + src);
+
+ }
+ }
+
+ /**
+ * Initialize for data streaming
+ */
+ private void initDataStreaming() {
+ this.setName("DataStreamer for file " + src +
+ " block " + block);
+ response = new ResponseProcessor(nodes);
+ response.start();
+ stage = BlockConstructionStage.DATA_STREAMING;
+ }
+
+ private void endBlock() {
+ LOG.debug("Closing old block " + block);
+ this.setName("DataStreamer for file " + src);
+ closeResponder();
+ closeStream();
+ nodes = null;
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
@@ -2461,47 +2634,69 @@
Packet one = null;
- // process IO errors if any
- boolean doSleep = processDatanodeError(hasError, false);
+ try {
+ // process datanode IO errors if any
+ boolean doSleep = false;
+ if (hasError && errorIndex>=0) {
+ doSleep = processDatanodeError();
+ }
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
- try {
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
+ synchronized (dataQueue) {
+ // wait for a packet to be sent.
+ while ((!streamerClosed && !hasError && clientRunning
+ && dataQueue.size() == 0) || doSleep) {
+ try {
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ doSleep = false;
}
- doSleep = false;
- }
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
- continue;
+ if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ continue;
+ }
+ // get packet to be sent.
+ one = dataQueue.getFirst();
}
- // get packet to be sent.
- one = dataQueue.getFirst();
- }
-
- try {
- long offsetInBlock = one.offsetInBlock;
// get new block from namenode.
- if (blockStream == null) {
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
- nodes = nextBlockOutputStream(src);
- this.setName("DataStreamer for file " + src +
- " block " + block);
- response = new ResponseProcessor(nodes);
- response.start();
+ nodes = nextBlockOutputStream(src);
+ initDataStreaming();
+ } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+ LOG.debug("Append to block " + block);
+ setupPipelineForAppendOrRecovery();
+ initDataStreaming();
}
- if (offsetInBlock >= blockSize) {
+ long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+ if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
- offsetInBlock +
+ lastByteOffsetInBlock +
" Aborting file " + src);
}
+ if (one.lastPacketInBlock) {
+ // wait for all data packets have been successfully acked
+ synchronized (dataQueue) {
+ while (!streamerClosed && !hasError &&
+ ackQueue.size() != 0 && clientRunning) {
+ try {
+ // wait for acks to arrive from datanodes
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+ stage = BlockConstructionStage.PIPELINE_CLOSE;
+ }
+
+ // send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) {
@@ -2511,19 +2706,45 @@
dataQueue.notifyAll();
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DataStreamer block " + block +
+ " sending packet " + one);
+ }
+
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
+ blockStream.flush();
+
+ // update bytesSent
+ long tmpBytesSent = one.getLastByteOffsetBlock();
+ if (bytesSent < tmpBytesSent) {
+ bytesSent = tmpBytesSent;
+ }
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+
+ // Is this block full?
if (one.lastPacketInBlock) {
- blockStream.writeInt(0); // indicate end-of-block
+ // wait for the close packet has been acked
+ synchronized (dataQueue) {
+ while (!streamerClosed && !hasError &&
+ ackQueue.size() != 0 && clientRunning) {
+ dataQueue.wait(1000);// wait for acks to arrive from datanodes
+ }
+ }
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+
+ endBlock();
}
- blockStream.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("DataStreamer block " + block +
- " wrote packet seqno:" + one.seqno +
- " size:" + buf.remaining() +
- " offsetInBlock:" + one.offsetInBlock +
- " lastPacketInBlock:" + one.lastPacketInBlock);
+ if (progress != null) { progress.progress(); }
+
+ // This is used by unit test to trigger race conditions.
+ if (artificialSlowdown != 0 && clientRunning) {
+ Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
@@ -2532,47 +2753,16 @@
setLastException((IOException)e);
}
hasError = true;
- }
-
-
- if (streamerClosed || hasError || !clientRunning) {
- continue;
- }
-
- // Is this block full?
- if (one.lastPacketInBlock) {
- synchronized (dataQueue) {
- while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
- try {
- dataQueue.wait(1000); // wait for acks to arrive from datanodes
- } catch (InterruptedException e) {
- }
- }
- }
- if (streamerClosed || hasError || !clientRunning) {
- continue;
+ if (errorIndex == -1) { // not a datanode error
+ streamerClosed = true;
}
-
- LOG.debug("Closing old block " + block);
- this.setName("DataStreamer for file " + src);
- closeResponder();
- closeStream();
- nodes = null;
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && clientRunning) {
- try {
- Thread.sleep(artificialSlowdown);
- } catch (InterruptedException e) {}
}
}
closeInternal();
}
private void closeInternal() {
- closeResponder();
+ closeResponder(); // close and join
closeStream();
streamerClosed = true;
closed = true;
@@ -2585,10 +2775,16 @@
* close both streamer and DFSOutputStream, should be called only
* by an external thread and only after all data to be sent has
* been flushed to datanode.
+ *
+ * Interrupt this data streamer if force is true
+ *
+ * @param force if this data stream is forced to be closed
*/
- void close() {
+ void close(boolean force) {
streamerClosed = true;
- this.interrupt();
+ if (force) {
+ this.interrupt();
+ }
}
private void closeResponder() {
@@ -2646,12 +2842,12 @@
// verify seqno from datanode
long seqno = blockReplyStream.readLong();
LOG.debug("DFSClient received ack for seqno " + seqno);
+ Packet one = null;
if (seqno == -1) {
continue;
} else if (seqno == -2) {
// no nothing
} else {
- Packet one = null;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
@@ -2664,10 +2860,20 @@
}
// processes response status from all datanodes.
+ String replies = null;
+ if (LOG.isDebugEnabled()) {
+ replies = "DFSClient Replies for seqno " + seqno + " are";
+ }
for (int i = 0; i < targets.length && clientRunning; i++) {
final DataTransferProtocol.Status reply
= DataTransferProtocol.Status.read(blockReplyStream);
+ if (LOG.isDebugEnabled()) {
+ replies += " " + reply;
+ }
if (reply != SUCCESS) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(replies);
+ }
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2676,6 +2882,18 @@
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(replies);
+ }
+
+ if (one == null) {
+ throw new IOException("Panic: responder did not receive " +
+ "an ack for a packet: " + seqno);
+ }
+
+ // update bytesAcked
+ block.setNumBytes(one.getLastByteOffsetBlock());
+
synchronized (dataQueue) {
ackQueue.removeFirst();
dataQueue.notifyAll();
@@ -2686,6 +2904,7 @@
setLastException((IOException)e);
}
hasError = true;
+ errorIndex = errorIndex==-1 ? 0 : errorIndex;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
@@ -2708,21 +2927,12 @@
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
- private boolean processDatanodeError(boolean error, boolean isAppend) {
- if (!error) {
- return false;
- }
+ private boolean processDatanodeError() throws IOException {
if (response != null) {
LOG.info("Error Recovery for block " + block +
" waiting for responder to exit. ");
return true;
}
- if (errorIndex >= 0) {
- LOG.warn("Error Recovery for block " + block
- + " bad datanode[" + errorIndex + "] "
- + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
- }
-
closeStream();
// move packets from ack queue to front of the data queue
@@ -2731,31 +2941,57 @@
ackQueue.clear();
}
- boolean success = false;
- while (!success && !streamerClosed && clientRunning) {
- DatanodeInfo[] newnodes = null;
- if (nodes == null) {
- String msg = "Could not get block locations. " + "Source file \""
- + src + "\" - Aborting...";
- LOG.warn(msg);
- setLastException(new IOException(msg));
- streamerClosed = true;
- return false;
- }
- StringBuilder pipelineMsg = new StringBuilder();
- for (int j = 0; j < nodes.length; j++) {
- pipelineMsg.append(nodes[j].getName());
- if (j < nodes.length - 1) {
- pipelineMsg.append(", ");
+ boolean doSleep = setupPipelineForAppendOrRecovery();
+
+ if (!streamerClosed && clientRunning) {
+ if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+ synchronized (dataQueue) {
+ dataQueue.remove(); // remove the end of block packet
+ dataQueue.notifyAll();
}
+ endBlock();
+ } else {
+ initDataStreaming();
}
+ }
+
+ return doSleep;
+ }
+
+
+ /**
+ * Open a DataOutputStream to a DataNode pipeline so that
+ * it can be written to.
+ * This happens when a file is appended or data streaming fails
+ * It keeps on trying until a pipeline is setup
+ */
+ private boolean setupPipelineForAppendOrRecovery() throws IOException {
+ // check number of datanodes
+ if (nodes == null || nodes.length == 0) {
+ String msg = "Could not get block locations. " + "Source file \""
+ + src + "\" - Aborting...";
+ LOG.warn(msg);
+ setLastException(new IOException(msg));
+ streamerClosed = true;
+ return false;
+ }
+
+ boolean success = false;
+ long newGS = 0L;
+ while (!success && !streamerClosed && clientRunning) {
+ boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
- if (errorIndex < 0) {
- newnodes = nodes;
- } else {
+ if (errorIndex >= 0) {
+ StringBuilder pipelineMsg = new StringBuilder();
+ for (int j = 0; j < nodes.length; j++) {
+ pipelineMsg.append(nodes[j].getName());
+ if (j < nodes.length - 1) {
+ pipelineMsg.append(", ");
+ }
+ }
if (nodes.length <= 1) {
lastException = new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting...");
@@ -2765,86 +3001,32 @@
LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
- newnodes = new DatanodeInfo[nodes.length-1];
+ DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
+ nodes = newnodes;
+ this.hasError = false;
+ lastException = null;
+ errorIndex = -1;
}
- // Tell the primary datanode to do error recovery
- // by stamping appropriate generation stamps.
- //
- LocatedBlock newBlock = null;
- ClientDatanodeProtocol primary = null;
- DatanodeInfo primaryNode = null;
- try {
- // Pick the "least" datanode as the primary datanode to avoid deadlock.
- primaryNode = Collections.min(Arrays.asList(newnodes));
- primary = createClientDatanodeProtocolProxy(primaryNode, conf);
- newBlock = primary.recoverBlock(block, isAppend, newnodes);
- } catch (IOException e) {
- recoveryErrorCount++;
- if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
- if (nodes.length > 1) {
- // if the primary datanode failed, remove it from the list.
- // The original bad datanode is left in the list because it is
- // conservative to remove only one datanode in one iteration.
- for (int j = 0; j < nodes.length; j++) {
- if (nodes[j].equals(primaryNode)) {
- errorIndex = j; // forget original bad node.
- }
- }
- // remove primary node from list
- newnodes = new DatanodeInfo[nodes.length-1];
- System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
- System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
- newnodes.length-errorIndex);
- nodes = newnodes;
- LOG.warn("Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg
- + ". Marking primary datanode as bad.");
- recoveryErrorCount = 0;
- errorIndex = -1;
- return true; // sleep when we return from here
- }
- String emsg = "Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg + ". Aborting...";
- LOG.warn(emsg);
- lastException = new IOException(emsg);
- streamerClosed = true;
- return false; // abort with IOexception
- }
- LOG.warn("Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg + ". Will retry...");
- return true; // sleep when we return from here
- } finally {
- RPC.stopProxy(primary);
- }
- recoveryErrorCount = 0; // block recovery successful
-
- // If the block recovery generated a new generation stamp, use that
- // from now on. Also, setup new pipeline
- // newBlock should never be null and it should contain a newly
- // generated access token.
- block = newBlock.getBlock();
- accessToken = newBlock.getAccessToken();
- nodes = newBlock.getLocations();
-
- this.hasError = false;
- lastException = null;
- errorIndex = 0;
- success = createBlockOutputStream(nodes, clientName, true);
+ // get a new generation stamp and an access token
+ LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
+ newGS = lb.getBlock().getGenerationStamp();
+ accessToken = lb.getAccessToken();
+
+ // set up the pipeline again with the remaining nodes
+ success = createBlockOutputStream(nodes, newGS, isRecovery);
}
- if (!streamerClosed && clientRunning) {
- response = new ResponseProcessor(nodes);
- response.start();
+ if (success) {
+ // update pipeline at the namenode
+ Block newBlock = new Block(
+ block.getBlockId(), block.getNumBytes(), newGS);
+ namenode.updatePipeline(clientName, block, newBlock, nodes);
+ // update client side generation stamp
+ block = newBlock;
}
return false; // do not sleep, continue processing
}
@@ -2864,24 +3046,26 @@
do {
hasError = false;
lastException = null;
- errorIndex = 0;
+ errorIndex = -1;
retry = false;
success = false;
long startTime = System.currentTimeMillis();
lb = locateFollowingBlock(startTime);
block = lb.getBlock();
+ block.setNumBytes(0);
accessToken = lb.getAccessToken();
nodes = lb.getLocations();
//
// Connect to first DataNode in the list.
//
- success = createBlockOutputStream(nodes, clientName, false);
+ success = createBlockOutputStream(nodes, 0L, false);
if (!success) {
LOG.info("Abandoning block " + block);
namenode.abandonBlock(block, src, clientName);
+ block = null;
// Connection failed. Let's wait a little bit and retry
retry = true;
@@ -2904,7 +3088,7 @@
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
- private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
DataTransferProtocol.Status pipelineStatus = SUCCESS;
String firstBadLink = "";
@@ -2939,9 +3123,11 @@
DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
+ // send the request
DataTransferProtocol.Sender.opWriteBlock(out,
- block.getBlockId(), block.getGenerationStamp(), nodes.length,
- recoveryFlag, client, null, nodes, accessToken);
+ block.getBlockId(), block.getGenerationStamp(),
+ nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+ block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
checksum.writeHeader(out);
out.flush();
@@ -2974,6 +3160,8 @@
break;
}
}
+ } else {
+ errorIndex = 0;
}
hasError = true;
setLastException(ie);
@@ -2989,7 +3177,7 @@
long localstart = System.currentTimeMillis();
while (true) {
try {
- return namenode.addBlock(src, clientName);
+ return namenode.addBlock(src, clientName, block);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
@@ -3030,52 +3218,8 @@
}
}
- void initAppend(LocatedBlock lastBlock, FileStatus stat,
- int bytesPerChecksum) throws IOException {
- block = lastBlock.getBlock();
- accessToken = lastBlock.getAccessToken();
- long usedInLastBlock = stat.getLen() % blockSize;
- int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
- // calculate the amount of free space in the pre-existing
- // last crc chunk
- int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
- int freeInCksum = bytesPerChecksum - usedInCksum;
-
- // if there is space in the last block, then we have to
- // append to that block
- if (freeInLastBlock > blockSize) {
- throw new IOException("The last block for file " +
- src + " is full.");
- }
-
- if (usedInCksum > 0 && freeInCksum > 0) {
- // if there is space in the last partial chunk, then
- // setup in such a way that the next packet will have only
- // one chunk that fills up the partial chunk.
- //
- computePacketChunkSize(0, freeInCksum);
- resetChecksumChunk(freeInCksum);
- appendChunk = true;
- } else {
- // if the remaining space in the block is smaller than
- // that expected size of of a packet, then create
- // smaller size packet.
- //
- computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock),
- bytesPerChecksum);
- }
-
- // setup pipeline to append to the last block XXX retries??
- nodes = lastBlock.getLocations();
- errorIndex = -1; // no errors yet.
- if (nodes.length < 1) {
- throw new IOException("Unable to retrieve blocks locations " +
- " for last block " + block +
- "of file " + src);
-
- }
- processDatanodeError(true, true);
+ Block getBlock() {
+ return block;
}
DatanodeInfo[] getNodes() {
@@ -3160,6 +3304,7 @@
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
+ streamer = new DataStreamer();
streamer.start();
}
@@ -3179,9 +3324,10 @@
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
- streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+ streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
computePacketChunkSize(writePacketSize, bytesPerChecksum);
+ streamer = new DataStreamer();
}
streamer.start();
}
@@ -3270,15 +3416,6 @@
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
- //
- // if we allocated a new packet because we encountered a block
- // boundary, reset bytesCurBlock.
- //
- if (bytesCurBlock == blockSize) {
- currentPacket.lastPacketInBlock = true;
- bytesCurBlock = 0;
- lastFlushOffset = -1;
- }
waitAndQueuePacket(currentPacket);
currentPacket = null;
@@ -3291,15 +3428,41 @@
}
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
+
+ //
+ // if encountering a block boundary, send an empty packet to
+ // indicate the end of block and reset bytesCurBlock.
+ //
+ if (bytesCurBlock == blockSize) {
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ bytesCurBlock);
+ currentPacket.lastPacketInBlock = true;
+ waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ bytesCurBlock = 0;
+ lastFlushOffset = -1;
+ }
}
}
/**
- * All data is written out to datanodes. It is not guaranteed
- * that data has been flushed to persistent store on the
- * datanode. Block allocations are persisted on namenode.
+ * @deprecated As of HDFS 0.21.0, replaced by hflush
+ * @see #hflush()
*/
+ @Deprecated
public synchronized void sync() throws IOException {
+ hflush();
+ }
+
+ /**
+ * All data is flushed out to datanodes.
+ * It is a synchronous operation. When it returns,
+ * it gurantees that flushed data become visible to new readers.
+ * It is not guaranteed that data has been flushed to
+ * persistent store on the datanode.
+ * Block allocations are persisted on namenode.
+ */
+ public synchronized void hflush() throws IOException {
checkOpen();
isClosed();
try {
@@ -3343,7 +3506,7 @@
}
} catch (IOException e) {
lastException = new IOException("IOException flush:" + e);
- closeThreads();
+ closeThreads(true);
throw e;
}
}
@@ -3384,13 +3547,14 @@
}
streamer.setLastException(new IOException("Lease timeout of " +
(hdfsTimeout/1000) + " seconds expired."));
- closeThreads();
+ closeThreads(true);
}
// shutdown datastreamer and responseprocessor threads.
- private void closeThreads() throws IOException {
+ // interrupt datastreamer if force is true
+ private void closeThreads(boolean force) throws IOException {
try {
- streamer.close();
+ streamer.close(force);
streamer.join();
if (s != null) {
s.close();
@@ -3421,21 +3585,22 @@
try {
flushBuffer(); // flush from all upper layers
- // Mark that this packet is the last packet in block.
- // If there are no outstanding packets and the last packet
- // was not the last one in the current block, then create a
- // packet with empty payload.
- if (currentPacket == null && bytesCurBlock != 0) {
- currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
- }
if (currentPacket != null) {
+ waitAndQueuePacket(currentPacket);
+ }
+
+ if (bytesCurBlock != 0) {
+ // send an empty packet to mark the end of the block
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}
flushInternal(); // flush all data to Datanodes
- closeThreads();
- completeFile();
+ // get last block before destroying the streamer
+ Block lastBlock = streamer.getBlock();
+ closeThreads(false);
+ completeFile(lastBlock);
leasechecker.remove(src);
} finally {
closed = true;
@@ -3444,11 +3609,11 @@
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
- private void completeFile() throws IOException {
+ private void completeFile(Block last) throws IOException {
long localstart = System.currentTimeMillis();
boolean fileComplete = false;
while (!fileComplete) {
- fileComplete = namenode.complete(src, clientName);
+ fileComplete = namenode.complete(src, clientName, last);
if (!fileComplete) {
if (!clientRunning ||
(hdfsTimeout > 0 &&
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Wed Sep 30 22:57:30 2009
@@ -18,6 +18,10 @@
package org.apache.hadoop.hdfs.protocol;
import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
/**
* This class provides an interface for accessing list of blocks that
@@ -25,41 +29,82 @@
* This class is useful for block report. Rather than send block reports
* as a Block[] we can send it as a long[].
*
+ * The structure of the array is as follows:
+ * 0: the length of the finalized replica list;
+ * 1: the length of the under-construction replica list;
+ * - followed by finalized replica list where each replica is represented by
+ * 3 longs: one for the blockId, one for the block length, and one for
+ * the generation stamp;
+ * - followed by the invalid replica represented with three -1s;
+ * - followed by the under-construction replica list where each replica is
+ * represented by 4 longs: three for the block id, length, generation
+ * stamp, and the forth for the replica state.
*/
-public class BlockListAsLongs implements Iterable<Block>{
+public class BlockListAsLongs implements Iterable<Block> {
/**
- * A block as 3 longs
+ * A finalized block as 3 longs
* block-id and block length and generation stamp
*/
- private static final int LONGS_PER_BLOCK = 3;
-
- private static int index2BlockId(int index) {
- return index*LONGS_PER_BLOCK;
- }
- private static int index2BlockLen(int index) {
- return (index*LONGS_PER_BLOCK) + 1;
- }
- private static int index2BlockGenStamp(int index) {
- return (index*LONGS_PER_BLOCK) + 2;
+ private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+
+ /**
+ * An under-construction block as 4 longs
+ * block-id and block length, generation stamp and replica state
+ */
+ private static final int LONGS_PER_UC_BLOCK = 4;
+
+ /** Number of longs in the header */
+ private static final int HEADER_SIZE = 2;
+
+ /**
+ * Returns the index of the first long in blockList
+ * belonging to the specified block.
+ * The first long contains the block id.
+ */
+ private int index2BlockId(int blockIndex) {
+ if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
+ return -1;
+ int finalizedSize = getNumberOfFinalizedReplicas();
+ if(blockIndex < finalizedSize)
+ return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
+ return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+ + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
}
-
+
private long[] blockList;
/**
- * Converting a block[] to a long[]
- * @param blockArray - the input array block[]
- * @return the output array of long[]
+ * Create block report from finalized and under construction lists of blocks.
+ *
+ * @param finalized - list of finalized blocks
+ * @param uc - list of under construction blocks
*/
- public static long[] convertToArrayLongs(final Block[] blockArray) {
- long[] blocksAsLongs = new long[blockArray.length * LONGS_PER_BLOCK];
+ public BlockListAsLongs(final List<? extends Block> finalized,
+ final List<ReplicaInfo> uc) {
+ int finalizedSize = finalized == null ? 0 : finalized.size();
+ int ucSize = uc == null ? 0 : uc.size();
+ int len = HEADER_SIZE
+ + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+ + ucSize * LONGS_PER_UC_BLOCK;
+
+ blockList = new long[len];
+
+ // set the header
+ blockList[0] = finalizedSize;
+ blockList[1] = ucSize;
+
+ // set finalized blocks
+ for (int i = 0; i < finalizedSize; i++) {
+ setBlock(i, finalized.get(i));
+ }
- BlockListAsLongs bl = new BlockListAsLongs(blocksAsLongs);
- assert bl.getNumberOfBlocks() == blockArray.length;
+ // set invalid delimiting block
+ setDelimitingBlock(finalizedSize);
- for (int i = 0; i < blockArray.length; i++) {
- bl.setBlock(i, blockArray[i]);
+ // set under construction blocks
+ for (int i = 0; i < ucSize; i++) {
+ setBlock(finalizedSize + i, uc.get(i));
}
- return blocksAsLongs;
}
public BlockListAsLongs() {
@@ -72,27 +117,29 @@
*/
public BlockListAsLongs(final long[] iBlockList) {
if (iBlockList == null) {
- blockList = new long[0];
- } else {
- if (iBlockList.length%LONGS_PER_BLOCK != 0) {
- // must be multiple of LONGS_PER_BLOCK
- throw new IllegalArgumentException();
- }
- blockList = iBlockList;
+ blockList = new long[HEADER_SIZE];
+ return;
}
+ blockList = iBlockList;
+ }
+
+ public long[] getBlockListAsLongs() {
+ return blockList;
}
/**
* Iterates over blocks in the block report.
* Avoids object allocation on each iteration.
*/
- private class BlockReportIterator implements Iterator<Block> {
+ public class BlockReportIterator implements Iterator<Block> {
private int currentBlockIndex;
private Block block;
+ private ReplicaState currentReplicaState;
BlockReportIterator() {
this.currentBlockIndex = 0;
this.block = new Block();
+ this.currentReplicaState = null;
}
public boolean hasNext() {
@@ -100,22 +147,39 @@
}
public Block next() {
- block.set(blockList[index2BlockId(currentBlockIndex)],
- blockList[index2BlockLen(currentBlockIndex)],
- blockList[index2BlockGenStamp(currentBlockIndex)]);
+ block.set(blockId(currentBlockIndex),
+ blockLength(currentBlockIndex),
+ blockGenerationStamp(currentBlockIndex));
+ currentReplicaState = blockReplicaState(currentBlockIndex);
currentBlockIndex++;
return block;
}
- public void remove() {
+ public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
+
+ /**
+ * Get the state of the current replica.
+ * The state corresponds to the replica returned
+ * by the latest {@link #next()}.
+ */
+ public ReplicaState getCurrentReplicaState() {
+ return currentReplicaState;
+ }
}
/**
* Returns an iterator over blocks in the block report.
*/
public Iterator<Block> iterator() {
+ return getBlockReportIterator();
+ }
+
+ /**
+ * Returns {@link BlockReportIterator}.
+ */
+ public BlockReportIterator getBlockReportIterator() {
return new BlockReportIterator();
}
@@ -124,7 +188,55 @@
* @return - the number of blocks
*/
public int getNumberOfBlocks() {
- return blockList.length/LONGS_PER_BLOCK;
+ assert blockList.length == HEADER_SIZE +
+ (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
+ blockList[1] * LONGS_PER_UC_BLOCK :
+ "Number of blocks is inconcistent with the array length";
+ return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
+ }
+
+ /**
+ * Returns the number of finalized replicas in the block report.
+ */
+ private int getNumberOfFinalizedReplicas() {
+ return (int)blockList[0];
+ }
+
+ /**
+ * Returns the number of under construction replicas in the block report.
+ */
+ private int getNumberOfUCReplicas() {
+ return (int)blockList[1];
+ }
+
+ /**
+ * Returns the id of the specified replica of the block report.
+ */
+ private long blockId(int index) {
+ return blockList[index2BlockId(index)];
+ }
+
+ /**
+ * Returns the length of the specified replica of the block report.
+ */
+ private long blockLength(int index) {
+ return blockList[index2BlockId(index) + 1];
+ }
+
+ /**
+ * Returns the generation stamp of the specified replica of the block report.
+ */
+ private long blockGenerationStamp(int index) {
+ return blockList[index2BlockId(index) + 2];
+ }
+
+ /**
+ * Returns the state of the specified replica of the block report.
+ */
+ private ReplicaState blockReplicaState(int index) {
+ if(index < getNumberOfFinalizedReplicas())
+ return ReplicaState.FINALIZED;
+ return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
}
/**
@@ -134,7 +246,7 @@
*/
@Deprecated
public long getBlockId(final int index) {
- return blockList[index2BlockId(index)];
+ return blockId(index);
}
/**
@@ -144,7 +256,7 @@
*/
@Deprecated
public long getBlockLen(final int index) {
- return blockList[index2BlockLen(index)];
+ return blockLength(index);
}
/**
@@ -154,7 +266,7 @@
*/
@Deprecated
public long getBlockGenStamp(final int index) {
- return blockList[index2BlockGenStamp(index)];
+ return blockGenerationStamp(index);
}
/**
@@ -162,9 +274,28 @@
* @param index - the index of the block to set
* @param b - the block is set to the value of the this block
*/
- private void setBlock(final int index, final Block b) {
- blockList[index2BlockId(index)] = b.getBlockId();
- blockList[index2BlockLen(index)] = b.getNumBytes();
- blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
+ private <T extends Block> void setBlock(final int index, final T b) {
+ int pos = index2BlockId(index);
+ blockList[pos] = b.getBlockId();
+ blockList[pos + 1] = b.getNumBytes();
+ blockList[pos + 2] = b.getGenerationStamp();
+ if(index < getNumberOfFinalizedReplicas())
+ return;
+ assert ((ReplicaInfo)b).getState() != ReplicaState.FINALIZED :
+ "Must be under-construction replica.";
+ blockList[pos + 3] = ((ReplicaInfo)b).getState().getValue();
+ }
+
+ /**
+ * Set the invalid delimiting block between the finalized and
+ * the under-construction lists.
+ * The invalid block has all three fields set to -1.
+ * @param finalizedSzie - the size of the finalized list
+ */
+ private void setDelimitingBlock(final int finalizedSzie) {
+ int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
+ blockList[idx] = -1;
+ blockList[idx+1] = -1;
+ blockList[idx+2] = -1;
}
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Sep 30 22:57:30 2009
@@ -29,19 +29,10 @@
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
/**
- * 4: never return null and always return a newly generated access token
+ * 6: recoverBlock() removed.
*/
- public static final long versionID = 4L;
+ public static final long versionID = 6L;
- /** Start generation-stamp recovery for specified block
- * @param block the specified block
- * @param keepLength keep the block length
- * @param targets the list of possible locations of specified block
- * @return either a new generation stamp, or the original generation stamp.
- * Regardless of whether a new generation stamp is returned, a newly
- * generated access token is returned as part of the return value.
- * @throws IOException
- */
- LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeInfo[] targets) throws IOException;
+ /** Return the visible length of a replica. */
+ long getReplicaVisibleLength(Block b) throws IOException;
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Sep 30 22:57:30 2009
@@ -44,9 +44,9 @@
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 48: modified mkdirs() to take an additional boolean parameter
+ * 50: change LocatedBlocks to include last block information.
*/
- public static final long versionID = 48L;
+ public static final long versionID = 50L;
///////////////////////////////////////
// File contents
@@ -93,8 +93,8 @@
* {@link #rename(String, String)} it until the file is completed
* or explicitly as a result of lease expiration.
* <p>
- * Blocks have a maximum size. Clients that intend to
- * create multi-block files must also use {@link #addBlock(String, String)}.
+ * Blocks have a maximum size. Clients that intend to create
+ * multi-block files must also use {@link #addBlock(String, String, Block)}.
*
* @param src path of the file being created.
* @param masked masked permission.
@@ -187,9 +187,14 @@
* addBlock() allocates a new block and datanodes the block data
* should be replicated to.
*
+ * addBlock() also commits the previous block by reporting
+ * to the name-node the actual generation stamp and the length
+ * of the block that the client has transmitted to data-nodes.
+ *
* @return LocatedBlock allocated block information.
*/
- public LocatedBlock addBlock(String src, String clientName) throws IOException;
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous) throws IOException;
/**
* The client is done writing data to the given filename, and would
@@ -197,13 +202,18 @@
*
* The function returns whether the file has been closed successfully.
* If the function returns false, the caller should try again.
+ *
+ * close() also commits the last block of the file by reporting
+ * to the name-node the actual generation stamp and the length
+ * of the block that the client has transmitted to data-nodes.
*
* A call to complete() will not return true until all the file's
* blocks have been replicated the minimum number of times. Thus,
* DataNode failures may cause a client to call complete() several
* times before succeeding.
*/
- public boolean complete(String src, String clientName) throws IOException;
+ public boolean complete(String src, String clientName,
+ Block last) throws IOException;
/**
* The client wants to report corrupted blocks (blocks with specified
@@ -500,4 +510,32 @@
* by this call.
*/
public void setTimes(String src, long mtime, long atime) throws IOException;
+
+ /**
+ * Get a new generation stamp together with an access token for
+ * a block under construction
+ *
+ * This method is called only when a client needs to recover a failed
+ * pipeline or set up a pipeline for appending to a block.
+ *
+ * @param block a block
+ * @param clientName the name of the client
+ * @return a located block with a new generation stamp and an access token
+ * @throws IOException if any error occurs
+ */
+ public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+ throws IOException;
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldBlock the old block
+ * @param newBlock the new block containing new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
+ */
+ public void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException;
}