You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2010/01/08 15:52:48 UTC
svn commit: r897222 [1/3] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ ivy/ src/c++/libhdfs/ src/contrib/
src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/java/ src/java...
Author: stevel
Date: Fri Jan 8 14:52:46 2010
New Revision: 897222
URL: http://svn.apache.org/viewvc?rev=897222&view=rev
Log:
HDFS-326 Merge with SVN_HEAD of 2010-01-08
Added:
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
- copied unchanged from r897215, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
- copied unchanged from r897215, hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
- copied unchanged from r897215, hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
- copied unchanged from r897215, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java
- copied unchanged from r897215, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java
- copied unchanged from r897215, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java
Removed:
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
Modified:
hadoop/hdfs/branches/HDFS-326/ (props changed)
hadoop/hdfs/branches/HDFS-326/.eclipse.templates/.classpath
hadoop/hdfs/branches/HDFS-326/CHANGES.txt
hadoop/hdfs/branches/HDFS-326/build.xml (contents, props changed)
hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml
hadoop/hdfs/branches/HDFS-326/src/c++/libhdfs/ (props changed)
hadoop/hdfs/branches/HDFS-326/src/contrib/build-contrib.xml
hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/build.xml
hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/Makefile.am
hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
hadoop/hdfs/branches/HDFS-326/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/HDFS-326/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
hadoop/hdfs/branches/HDFS-326/src/java/ (props changed)
hadoop/hdfs/branches/HDFS-326/src/java/hdfs-default.xml
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Util.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
hadoop/hdfs/branches/HDFS-326/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
hadoop/hdfs/branches/HDFS-326/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/HDFS-326/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-326/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/HDFS-326/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 8 14:52:46 2010
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-265:796829-820463
/hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:804973-885783
+/hadoop/hdfs/trunk:804973-897215
Modified: hadoop/hdfs/branches/HDFS-326/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/.eclipse.templates/.classpath?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/.eclipse.templates/.classpath (original)
+++ hadoop/hdfs/branches/HDFS-326/.eclipse.templates/.classpath Fri Jan 8 14:52:46 2010
@@ -9,20 +9,19 @@
<classpathentry kind="src" path="src/contrib/thriftfs/src/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
- <classpathentry kind="lib" path="lib/hadoop-core-0.22.0-dev.jar"/>
- <classpathentry kind="lib" path="lib/hadoop-core-test-0.22.0-dev.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hadoop-core-0.22.0-SNAPSHOT.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/hadoop-core-test-0.22.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-cli-1.2.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-codec-1.3.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-el-1.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-httpclient-3.0.1.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.0.4.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-api-1.0.4.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.1.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-net-1.4.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/core-3.1.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hsqldb-1.8.0.10.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-compiler-5.5.12.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-runtime-5.5.12.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.6.1.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.7.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-6.1.14.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-util-6.1.14.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jsp-2.1-6.1.14.jar"/>
@@ -32,16 +31,17 @@
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/log4j-1.2.15.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/oro-2.0.8.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/servlet-api-2.5-6.1.14.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.4.3.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-log4j12-1.4.3.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.5.8.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/slf4j-log4j12-1.4.3.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/xmlenc-0.52.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/aspectjrt-1.5.3.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.0.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/aspectjrt-1.6.5.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.core.framework.uberjar.javaEE.14-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.ant-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.shared.api-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-ant-0.9.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-core-uberjar-0.9.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>
<classpathentry kind="lib" path="src/contrib/thriftfs/lib/hadoopthriftapi.jar"/>
<classpathentry kind="lib" path="src/contrib/thriftfs/lib/libthrift.jar"/>
<classpathentry kind="lib" path="build/test/classes"/>
Modified: hadoop/hdfs/branches/HDFS-326/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/CHANGES.txt?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-326/CHANGES.txt Fri Jan 8 14:52:46 2010
@@ -18,29 +18,28 @@
IMPROVEMENTS
- HDFS-704. Unify build property names to facilitate cross-projects
- modifications (cos)
-
HDFS-703. Replace current fault injection implementation with one
from (cos)
HDFS-754. Reduce ivy console output to ovservable level (cos)
- HDFS-699. Add unit tests framework (Mockito) (cos, Eli Collins)
+ HDFS-832. HDFS side of HADOOP-6222. (cos)
- HDFS-630 In DFSOutputStream.nextBlockOutputStream(), the client can
- exclude specific datanodes when locating the next block
- (Cosmin Lehene via Stack)
+ HDFS-840. Change tests to use FileContext test helper introduced in
+ HADOOP-6394. (Jitendra Nath Pandey via suresh)
- HDFS-519. Create new tests for lease recovery (cos)
+ HDFS-685. Use the user-to-groups mapping service in the NameNode. (boryas, acmurthy)
+
+ HDFS-755. Read multiple checksum chunks at once in DFSInputStream.
+ (Todd Lipcon via tomwhite)
+
+ HDFS-786. Implement getContentSummary in HftpFileSystem.
+ (Tsz Wo (Nicholas), SZE via cdouglas)
OPTIMIZATIONS
BUG FIXES
- HDFS-646. Fix test-patch failure by adding test-contrib ant target.
- (gkesavan)
-
HDFS-695. RaidNode should read in configuration from hdfs-site.xml.
(dhruba)
@@ -49,8 +48,6 @@
HDFS-750. Fix build failure due to TestRename. (suresh)
- HDFS-733. TestBlockReport fails intermittently. (cos)
-
HDFS-712. Move libhdfs from mapreduce subproject to hdfs subproject.
(Eli Collins via dhruba)
@@ -62,13 +59,6 @@
HDFS-751. Fix TestCrcCorruption to pick up the correct datablocks to
corrupt. (dhruba)
- HDFS-774. Intermittent race condition in TestFiPipelines (cos)
-
- HDFS-741. TestHFlush test doesn't seek() past previously written part of
- the file (cos, szetszwo)
-
- HDFS-706. Intermittent failures in TestFiHFlush (cos)
-
HDFS-763. Fix slightly misleading report from DataBlockScanner
about corrupted scans. (dhruba)
@@ -81,7 +71,21 @@
HDFS-785. Add Apache license to several namenode unit tests.
(Ravi Phulari via jghoman)
- HDFS-791. Build is broken after HDFS-787 patch has been applied (cos)
+ HDFS-802. Update Eclipse configuration to match changes to Ivy
+ configuration (Edwin Chan via cos)
+
+ HDFS-423. Unbreak FUSE build and fuse_dfs_wrapper.sh (Eli Collins via cos)
+
+ HDFS-825. Build fails to pull latest hadoop-core-* artifacts (cos)
+
+ HDFS-94. The Heap Size printed in the NameNode WebUI is accurate.
+ (Dmytro Molkov via dhruba)
+
+ HDFS-767. An improved retry policy when the DFSClient is unable to fetch a
+ block from the datanode. (Ning Zhang via dhruba)
+
+ HDFS-187. Initialize secondary namenode http address in TestStartup.
+ (Todd Lipcon via szetszwo)
Release 0.21.0 - Unreleased
@@ -186,6 +190,8 @@
HDFS-631. Rename configuration keys towards API standardization and
backward compatibility. (Jitendra Nath Pandey via suresh)
+ HDFS-669. Add unit tests framework (Mockito) (cos, Eli Collins)
+
HDFS-731. Support new Syncable interface in HDFS. (hairong)
HDFS-702. Add HDFS implementation of AbstractFileSystem.
@@ -194,6 +200,9 @@
HDFS-758. Add decommissioning status page to Namenode Web UI.
(Jitendra Nath Pandey via suresh)
+ HDFS-814. Add an api to get the visible length of a DFSDataInputStream.
+ (szetszwo)
+
IMPROVEMENTS
HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -354,6 +363,9 @@
HDFS-680. Add new access method to a copy of a block's replica. (shv)
+ HDFS-704. Unify build property names to facilitate cross-projects
+ modifications (cos)
+
HDFS-705. Create an adapter to access some of package-private methods of
DataNode from tests (cos)
@@ -393,6 +405,12 @@
HDFS-787. Upgrade some libraries to be consistent with common and
mapreduce. (omalley)
+ HDFS-519. Create new tests for lease recovery (cos)
+
+ HDFS-804. New unit tests for concurrent lease recovery (cos)
+
+ HDFS-813. Enable the append test in TestReadWhileWriting. (szetszwo)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
@@ -540,6 +558,52 @@
HDFS-691. Fix an overflow error in DFSClient.DFSInputStream.available().
(szetszwo)
+ HDFS-733. TestBlockReport fails intermittently. (cos)
+
+ HDFS-774. Intermittent race condition in TestFiPipelines (cos)
+
+ HDFS-741. TestHFlush test doesn't seek() past previously written part of
+ the file (cos, szetszwo)
+
+ HDFS-706. Intermittent failures in TestFiHFlush (cos)
+
+ HDFS-646. Fix test-patch failure by adding test-contrib ant target.
+ (gkesavan)
+
+ HDFS-791. Build is broken after HDFS-787 patch has been applied (cos)
+
+ HDFS-792. TestHDFSCLI is failing. (Todd Lipcon via cos)
+
+ HDFS-781. Namenode metrics PendingDeletionBlocks is not decremented.
+ (Suresh)
+
+ HDFS-192. Fix TestBackupNode failures. (shv)
+
+ HDFS-797. TestHDFSCLI much slower after HDFS-265 merge. (Todd Lipcon via cos)
+
+ HDFS-824. Stop lease checker in TestReadWhileWriting. (szetszwo)
+
+ HDFS-823. CheckPointer should use addInternalServlet for image-fetching
+ servlet (jghoman)
+
+ HDFS-456. Fix URI generation for windows file paths. (shv)
+
+ HDFS-812. FSNamesystem#internalReleaseLease throws NullPointerException on
+ a single-block file's lease recovery. (cos)
+
+ HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+ (hairong)
+
+ HDFS-564. Adding pipeline tests 17-35. (hairong)
+
+ HDFS-849. TestFiDataTransferProtocol2#pipeline_Fi_18 sometimes fails.
+ (hairong)
+
+ HDFS-762. Balancer causes Null Pointer Exception.
+ (Cristian Ivascu via dhruba)
+
+ HDFS-868. Fix link to Hadoop Upgrade Wiki. (Chris A. Mattmann via shv)
+
Release 0.20.2 - Unreleased
IMPROVEMENTS
@@ -570,6 +634,15 @@
HDFS-596. Fix memory leak in hdfsFreeFileInfo() for libhdfs.
(Zhang Bingjun via dhruba)
+ HDFS-793. Data node should receive the whole packet ack message before it
+ constructs and sends its own ack message for the packet. (hairong)
+
+ HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
+ name-node is in safemode. (Ravi Phulari via shv)
+
+ HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second
+ datanode failure. (hairong)
+
Release 0.20.1 - 2009-09-01
IMPROVEMENTS
Modified: hadoop/hdfs/branches/HDFS-326/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/build.xml?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/build.xml Fri Jan 8 14:52:46 2010
@@ -1086,6 +1086,8 @@
<env key="JVM_ARCH" value="${jvm.arch}"/>
<arg value="install"/>
</exec>
+ <!-- Create a build platform-agnostic link to c++ libs -->
+ <symlink overwrite="true" link="${build.dir}/c++/lib" resource="${install.c++}/lib"/>
</target>
<target name="compile-ant-tasks" depends="compile-core">
Propchange: hadoop/hdfs/branches/HDFS-326/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 8 14:52:46 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/build.xml:779102
/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
/hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:804973-885783
+/hadoop/hdfs/trunk/build.xml:804973-897215
Modified: hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml Fri Jan 8 14:52:46 2010
@@ -39,14 +39,15 @@
<resolvers>
<ibiblio name="maven2" root="${repo.maven.org}" pattern="${maven2.pattern.ext}" m2compatible="true"/>
- <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"/>
+ <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"
+ checkmodified="true" changingPattern=".*SNAPSHOT"/>
<filesystem name="fs" m2compatible="true" force="true">
<artifact pattern="${repo.dir}/org/apache/hadoop/[module]/[revision]/[module]-[revision].[ext]"/>
<ivy pattern="${repo.dir}/org/apache/hadoop/[module]/[revision]/[module]-[revision].pom"/>
</filesystem>
- <chain name="default" dual="true">
+ <chain name="default" dual="true" checkmodified="true" changingPattern=".*SNAPSHOT">
<resolver ref="apache-snapshot"/>
<resolver ref="maven2"/>
</chain>
Propchange: hadoop/hdfs/branches/HDFS-326/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 8 14:52:46 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:807691-885783
+/hadoop/hdfs/trunk/src/c++/libhdfs:807691-897215
Modified: hadoop/hdfs/branches/HDFS-326/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/contrib/build-contrib.xml?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/contrib/build-contrib.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/src/contrib/build-contrib.xml Fri Jan 8 14:52:46 2010
@@ -43,6 +43,9 @@
<property name="test.timeout" value="900000"/>
<property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
<property name="build.classes" location="${build.dir}/classes"/>
+ <!-- NB: sun.arch.data.model is not supported on all platforms -->
+ <property name="build.platform"
+ value="${os.name}-${os.arch}-${sun.arch.data.model}"/>
<property name="build.test" location="${build.dir}/test"/>
<property name="build.examples" location="${build.dir}/examples"/>
<property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
Modified: hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/build.xml?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/build.xml Fri Jan 8 14:52:46 2010
@@ -32,9 +32,9 @@
<target name="check-libhdfs-exists" if="fusedfs">
- <property name="libhdfs.lib" value="${hadoop.root}/build/libhdfs/libhdfs.so"/>
+ <property name="libhdfs.lib" value="${hadoop.root}/build/c++/${build.platform}/lib/libhdfs.so"/>
<available file="${libhdfs.lib}" property="libhdfs-exists"/>
- <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile-libhdfs -Dlibhdfs=1">
+ <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile -Dcompile.c++=true -Dlibhdfs=true">
<condition>
<not><isset property="libhdfs-exists"/></not>
</condition>
@@ -59,7 +59,7 @@
<env key="OS_ARCH" value="${os.arch}"/>
<env key="HADOOP_HOME" value="${hadoop.root}"/>
<env key="PACKAGE_VERSION" value="0.1.0"/>
-
+ <env key="BUILD_PLATFORM" value="${build.platform}" />
<env key="PERMS" value="${perms}"/>
</exec>
<mkdir dir="${build.dir}"/>
Modified: hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/Makefile.am?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/Makefile.am (original)
+++ hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/Makefile.am Fri Jan 8 14:52:46 2010
@@ -17,5 +17,4 @@
bin_PROGRAMS = fuse_dfs
fuse_dfs_SOURCES = fuse_dfs.c fuse_options.c fuse_trash.c fuse_stat_struct.c fuse_users.c fuse_init.c fuse_connect.c fuse_impls_access.c fuse_impls_chmod.c fuse_impls_chown.c fuse_impls_create.c fuse_impls_flush.c fuse_impls_getattr.c fuse_impls_mkdir.c fuse_impls_mknod.c fuse_impls_open.c fuse_impls_read.c fuse_impls_release.c fuse_impls_readdir.c fuse_impls_rename.c fuse_impls_rmdir.c fuse_impls_statfs.c fuse_impls_symlink.c fuse_impls_truncate.c fuse_impls_utimens.c fuse_impls_unlink.c fuse_impls_write.c
AM_CPPFLAGS= -DPERMS=$(PERMS) -D_FILE_OFFSET_BITS=64 -I$(JAVA_HOME)/include -I$(HADOOP_HOME)/src/c++/libhdfs/ -I$(JAVA_HOME)/include/linux/ -D_FUSE_DFS_VERSION=\"$(PACKAGE_VERSION)\" -DPROTECTED_PATHS=\"$(PROTECTED_PATHS)\" -I$(FUSE_HOME)/include
-AM_LDFLAGS= -L$(HADOOP_HOME)/build/libhdfs -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
-
+AM_LDFLAGS= -L$(HADOOP_HOME)/build/c++/$(BUILD_PLATFORM)/lib -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
Modified: hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh (original)
+++ hadoop/hdfs/branches/HDFS-326/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh Fri Jan 8 14:52:46 2010
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -19,12 +20,6 @@
export HADOOP_HOME=/usr/local/share/hadoop
fi
-export PATH=$HADOOP_HOME/contrib/fuse_dfs:$PATH
-
-for f in ls $HADOOP_HOME/lib/*.jar $HADOOP_HOME/*.jar ; do
-export CLASSPATH=$CLASSPATH:$f
-done
-
if [ "$OS_ARCH" = "" ]; then
export OS_ARCH=amd64
fi
@@ -37,4 +32,17 @@
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
fi
-./fuse_dfs $@
+# If dev build set paths accordingly
+if [ -d $HADOOP_HDFS_HOME/build ]; then
+ export HADOOP_HOME=$HADOOP_HDFS_HOME
+ for f in ${HADOOP_HOME}/build/*.jar ; do
+ export CLASSPATH=$CLASSPATH:$f
+ done
+ for f in $HADOOP_HOME/build/ivy/lib/Hadoop-Hdfs/common/*.jar ; do
+ export CLASSPATH=$CLASSPATH:$f
+ done
+ export PATH=$HADOOP_HOME/build/contrib/fuse-dfs:$PATH
+ export LD_LIBRARY_PATH=$HADOOP_HOME/build/c++/lib:$JAVA_HOME/jre/lib/$OS_ARCH/server
+fi
+
+fuse_dfs $@
Propchange: hadoop/hdfs/branches/HDFS-326/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 8 14:52:46 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:804973-885783
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:804973-897215
Modified: hadoop/hdfs/branches/HDFS-326/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Fri Jan 8 14:52:46 2010
@@ -530,7 +530,7 @@
of Hadoop and rollback the cluster to the state it was in
before
the upgrade. HDFS upgrade is described in more detail in
- <a href="http://wiki.apache.org/hadoop/Hadoop%20Upgrade">Hadoop Upgrade</a> Wiki page.
+ <a href="http://wiki.apache.org/hadoop/Hadoop_Upgrade">Hadoop Upgrade</a> Wiki page.
HDFS can have one such backup at a time. Before upgrading,
administrators need to remove existing backup using <code>bin/hadoop
dfsadmin -finalizeUpgrade</code> command. The following
Propchange: hadoop/hdfs/branches/HDFS-326/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 8 14:52:46 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:804973-885783
+/hadoop/hdfs/trunk/src/java:804973-897215
Modified: hadoop/hdfs/branches/HDFS-326/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/hdfs-default.xml?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/hdfs-default.xml Fri Jan 8 14:52:46 2010
@@ -169,7 +169,7 @@
<property>
<name>dfs.namenode.name.dir</name>
- <value>${hadoop.tmp.dir}/dfs/name</value>
+ <value>file://${hadoop.tmp.dir}/dfs/name</value>
<description>Determines where on the local filesystem the DFS name node
should store the name table(fsimage). If this is a comma-delimited list
of directories then the name table is replicated in all of the
@@ -447,7 +447,7 @@
<property>
<name>dfs.namenode.checkpoint.dir</name>
- <value>${hadoop.tmp.dir}/dfs/namesecondary</value>
+ <value>file://${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 8 14:52:46 2010
@@ -87,6 +87,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -1510,26 +1511,60 @@
}
}
- int chunkLen = Math.min(dataLeft, bytesPerChecksum);
-
- if ( chunkLen > 0 ) {
- // len should be >= chunkLen
- IOUtils.readFully(in, buf, offset, chunkLen);
- checksumBytes.get(checksumBuf, 0, checksumSize);
+ // Sanity checks
+ assert len >= bytesPerChecksum;
+ assert checksum != null;
+ assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+ int checksumsToRead, bytesToRead;
+
+ if (checksumSize > 0) {
+
+ // How many chunks left in our stream - this is a ceiling
+ // since we may have a partial chunk at the end of the file
+ int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+ // How many chunks we can fit in databuffer
+ // - note this is a floor since we always read full chunks
+ int chunksCanFit = Math.min(len / bytesPerChecksum,
+ checksumBuf.length / checksumSize);
+
+ // How many chunks should we read
+ checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+ // How many bytes should we actually read
+ bytesToRead = Math.min(
+ checksumsToRead * bytesPerChecksum, // full chunks
+ dataLeft); // in case we have a partial
+ } else {
+ // no checksum
+ bytesToRead = Math.min(dataLeft, len);
+ checksumsToRead = 0;
+ }
+
+ if ( bytesToRead > 0 ) {
+ // Assert we have enough space
+ assert bytesToRead <= len;
+ assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+ assert checksumBuf.length >= checksumSize * checksumsToRead;
+ IOUtils.readFully(in, buf, offset, bytesToRead);
+ checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
}
- dataLeft -= chunkLen;
+ dataLeft -= bytesToRead;
+ assert dataLeft >= 0;
+
lastChunkOffset = chunkOffset;
- lastChunkLen = chunkLen;
+ lastChunkLen = bytesToRead;
- if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
+ if ((dataLeft == 0 && isLastPacket) || bytesToRead == 0) {
gotEOS = true;
}
- if ( chunkLen == 0 ) {
+ if ( bytesToRead == 0 ) {
return -1;
}
-
- return chunkLen;
+
+ return bytesToRead;
}
private BlockReader( String file, long blockId, DataInputStream in,
@@ -1661,7 +1696,7 @@
* DFSInputStream provides bytes from a named file. It handles
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
- class DFSInputStream extends FSInputStream {
+ private class DFSInputStream extends FSInputStream {
private Socket s = null;
private boolean closed = false;
@@ -1676,6 +1711,7 @@
private long pos = 0;
private long blockEnd = -1;
private int failures = 0;
+ private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -1695,6 +1731,7 @@
this.buffersize = buffersize;
this.src = src;
prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
+ timeWindow = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
openInfo();
}
@@ -2147,7 +2184,19 @@
+ " from any node: " + ie
+ ". Will get new block locations from namenode and retry...");
try {
- Thread.sleep(3000);
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a BlockMissingException, the wait time
+ // is a random number between 0..3000 ms. If the first retry
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 6000 ms
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 6000ms grace period before retry and the waiting window is
+ // expanded to 9000ms.
+ double waitTime = timeWindow * failures + // grace period for the last round of attempt
+ timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for each failure
+ LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+ Thread.sleep((long)waitTime);
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
@@ -2392,6 +2441,9 @@
}
}
+ /**
+ * The Hdfs implementation of {@link FSDataInputStream}
+ */
public static class DFSDataInputStream extends FSDataInputStream {
public DFSDataInputStream(DFSInputStream in)
throws IOException {
@@ -2419,6 +2471,12 @@
return ((DFSInputStream)in).getAllBlocks();
}
+ /**
+ * @return The visible length of the file.
+ */
+ public long getVisibleLength() throws IOException {
+ return ((DFSInputStream)in).getFileLength();
+ }
}
/****************************************************************
@@ -2482,7 +2540,27 @@
int dataPos;
int checksumStart;
int checksumPos;
-
+ private static final long HEART_BEAT_SEQNO = -1L;
+
+ /**
+ * create a heartbeat packet
+ */
+ Packet() {
+ this.lastPacketInBlock = false;
+ this.numChunks = 0;
+ this.offsetInBlock = 0;
+ this.seqno = HEART_BEAT_SEQNO;
+
+ buffer = null;
+ int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ buf = new byte[packetSize];
+
+ checksumStart = dataStart = packetSize;
+ checksumPos = checksumStart;
+ dataPos = dataStart;
+ maxChunks = 0;
+ }
+
// create a new packet
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
@@ -2569,6 +2647,14 @@
return offsetInBlock + dataPos - dataStart;
}
+ /**
+ * Check if this packet is a heart beat packet
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ private boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
public String toString() {
return "packet seqno:" + this.seqno +
" offsetInBlock:" + this.offsetInBlock +
@@ -2593,7 +2679,6 @@
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
- private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
volatile boolean hasError = false;
volatile int errorIndex = -1;
private BlockConstructionStage stage; // block construction stage
@@ -2687,6 +2772,7 @@
* and closes them. Any error recovery is also done by this thread.
*/
public void run() {
+ long lastPacket = System.currentTimeMillis();
while (!streamerClosed && clientRunning) {
// if the Responder encountered an error, shutdown Responder
@@ -2710,19 +2796,32 @@
synchronized (dataQueue) {
// wait for a packet to be sent.
+ long now = System.currentTimeMillis();
while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
+ && dataQueue.size() == 0 &&
+ (stage != BlockConstructionStage.DATA_STREAMING ||
+ stage == BlockConstructionStage.DATA_STREAMING &&
+ now - lastPacket < socketTimeout/2)) || doSleep ) {
+ long timeout = socketTimeout/2 - (now-lastPacket);
+ timeout = timeout <= 0 ? 1000 : timeout;
+ timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+ timeout : 1000;
try {
- dataQueue.wait(1000);
+ dataQueue.wait(timeout);
} catch (InterruptedException e) {
}
doSleep = false;
+ now = System.currentTimeMillis();
}
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ if (streamerClosed || hasError || !clientRunning) {
continue;
}
// get packet to be sent.
- one = dataQueue.getFirst();
+ if (dataQueue.isEmpty()) {
+ one = new Packet(); // heartbeat packet
+ } else {
+ one = dataQueue.getFirst(); // regular data packet
+ }
}
// get new block from namenode.
@@ -2768,9 +2867,11 @@
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- dataQueue.notifyAll();
+ if (!one.isHeartbeatPacket()) {
+ dataQueue.removeFirst();
+ ackQueue.addLast(one);
+ dataQueue.notifyAll();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -2781,6 +2882,10 @@
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
blockStream.flush();
+ lastPacket = System.currentTimeMillis();
+
+ if (one.isHeartbeatPacket()) { //heartbeat packet
+ }
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2849,6 +2954,9 @@
*/
void close(boolean force) {
streamerClosed = true;
+ synchronized (dataQueue) {
+ dataQueue.notifyAll();
+ }
if (force) {
this.interrupt();
}
@@ -2902,45 +3010,22 @@
public void run() {
this.setName("ResponseProcessor for block " + block);
+ PipelineAck ack = new PipelineAck();
while (!responderClosed && clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
- // verify seqno from datanode
- long seqno = blockReplyStream.readLong();
- LOG.debug("DFSClient received ack for seqno " + seqno);
- Packet one = null;
- if (seqno == -1) {
- continue;
- } else if (seqno == -2) {
- // no nothing
- } else {
- synchronized (dataQueue) {
- one = ackQueue.getFirst();
- }
- if (one.seqno != seqno) {
- throw new IOException("Responseprocessor: Expecting seqno " +
- " for block " + block +
- one.seqno + " but received " + seqno);
- }
- isLastPacketInBlock = one.lastPacketInBlock;
- }
-
- // processes response status from all datanodes.
- String replies = null;
+ // read an ack from the pipeline
+ ack.readFields(blockReplyStream);
if (LOG.isDebugEnabled()) {
- replies = "DFSClient Replies for seqno " + seqno + " are";
+ LOG.debug("DFSClient " + ack);
}
- for (int i = 0; i < targets.length && clientRunning; i++) {
- final DataTransferProtocol.Status reply
- = DataTransferProtocol.Status.read(blockReplyStream);
- if (LOG.isDebugEnabled()) {
- replies += " " + reply;
- }
+
+ long seqno = ack.getSeqno();
+ // processes response status from datanodes.
+ for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
+ final DataTransferProtocol.Status reply = ack.getReply(i);
if (reply != SUCCESS) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(replies);
- }
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2948,16 +3033,24 @@
targets[i].getName());
}
}
+
+ assert seqno != PipelineAck.UNKOWN_SEQNO :
+ "Ack for unkown seqno should be a failed ack: " + ack;
+ if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
+ continue;
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug(replies);
+ // a success ack for a data packet
+ Packet one = null;
+ synchronized (dataQueue) {
+ one = ackQueue.getFirst();
}
-
- if (one == null) {
- throw new IOException("Panic: responder did not receive " +
- "an ack for a packet: " + seqno);
+ if (one.seqno != seqno) {
+ throw new IOException("Responseprocessor: Expecting seqno " +
+ " for block " + block +
+ one.seqno + " but received " + seqno);
}
-
+ isLastPacketInBlock = one.lastPacketInBlock;
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
@@ -3118,9 +3211,7 @@
success = false;
long startTime = System.currentTimeMillis();
- DatanodeInfo[] w = excludedNodes.toArray(
- new DatanodeInfo[excludedNodes.size()]);
- lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
+ lb = locateFollowingBlock(startTime);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getAccessToken();
@@ -3136,16 +3227,12 @@
namenode.abandonBlock(block, src, clientName);
block = null;
- LOG.info("Excluding datanode " + nodes[errorIndex]);
- excludedNodes.add(nodes[errorIndex]);
-
// Connection failed. Let's wait a little bit and retry
retry = true;
try {
if (System.currentTimeMillis() - startTime > 5000) {
LOG.info("Waiting to find target node: " + nodes[0].getName());
}
- //TODO fix this timout. Extract it o a constant, maybe make it available from conf
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
@@ -3243,15 +3330,14 @@
}
}
- private LocatedBlock locateFollowingBlock(long start,
- DatanodeInfo[] excludedNodes) throws IOException {
+ private LocatedBlock locateFollowingBlock(long start) throws IOException {
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
- return namenode.addBlock(src, clientName, block, excludedNodes);
+ return namenode.addBlock(src, clientName, block);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 8 14:52:46 2010
@@ -95,6 +95,7 @@
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
+ public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Fri Jan 8 14:52:46 2010
@@ -22,6 +22,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.authorize.Service;
@@ -41,6 +42,8 @@
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
new Service("security.refresh.policy.protocol.acl",
RefreshAuthorizationPolicyProtocol.class),
+ new Service("security.refresh.usertogroups.mappings.protocol.acl",
+ RefreshUserToGroupMappingsProtocol.class),
};
@Override
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Jan 8 14:52:46 2010
@@ -36,6 +36,7 @@
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -329,4 +330,101 @@
throw new IOException("Not supported");
}
+ /**
+ * A parser for parsing {@link ContentSummary} xml.
+ */
+ private class ContentSummaryParser extends DefaultHandler {
+ private ContentSummary contentsummary;
+
+ /** {@inheritDoc} */
+ public void startElement(String ns, String localname, String qname,
+ Attributes attrs) throws SAXException {
+ if (!ContentSummary.class.getName().equals(qname)) {
+ if (RemoteException.class.getSimpleName().equals(qname)) {
+ throw new SAXException(RemoteException.valueOf(attrs));
+ }
+ throw new SAXException("Unrecognized entry: " + qname);
+ }
+
+ contentsummary = toContentSummary(attrs);
+ }
+
+ /**
+ * Connect to the name node and get content summary.
+ * @param path The path
+ * @return The content summary for the path.
+ * @throws IOException
+ */
+ private ContentSummary getContentSummary(String path) throws IOException {
+ final HttpURLConnection connection = openConnection(
+ "/contentSummary" + path, "ugi=" + ugi);
+ InputStream in = null;
+ try {
+ in = connection.getInputStream();
+
+ final XMLReader xr = XMLReaderFactory.createXMLReader();
+ xr.setContentHandler(this);
+ xr.parse(new InputSource(in));
+ } catch(FileNotFoundException fnfe) {
+ //the server may not support getContentSummary
+ return null;
+ } catch(SAXException saxe) {
+ final Exception embedded = saxe.getException();
+ if (embedded != null && embedded instanceof IOException) {
+ throw (IOException)embedded;
+ }
+ throw new IOException("Invalid xml format", saxe);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ connection.disconnect();
+ }
+ return contentsummary;
+ }
+ }
+
+ /** Return the object represented in the attributes. */
+ private static ContentSummary toContentSummary(Attributes attrs
+ ) throws SAXException {
+ final String length = attrs.getValue("length");
+ final String fileCount = attrs.getValue("fileCount");
+ final String directoryCount = attrs.getValue("directoryCount");
+ final String quota = attrs.getValue("quota");
+ final String spaceConsumed = attrs.getValue("spaceConsumed");
+ final String spaceQuota = attrs.getValue("spaceQuota");
+
+ if (length == null
+ || fileCount == null
+ || directoryCount == null
+ || quota == null
+ || spaceConsumed == null
+ || spaceQuota == null) {
+ return null;
+ }
+
+ try {
+ return new ContentSummary(
+ Long.parseLong(length),
+ Long.parseLong(fileCount),
+ Long.parseLong(directoryCount),
+ Long.parseLong(quota),
+ Long.parseLong(spaceConsumed),
+ Long.parseLong(spaceQuota));
+ } catch(Exception e) {
+ throw new SAXException("Invalid attributes: length=" + length
+ + ", fileCount=" + fileCount
+ + ", directoryCount=" + directoryCount
+ + ", quota=" + quota
+ + ", spaceConsumed=" + spaceConsumed
+ + ", spaceQuota=" + spaceQuota, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ final String s = makeQualified(f).toUri().getPath();
+ final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
+ return cs != null? cs: super.getContentSummary(f);
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Jan 8 14:52:46 2010
@@ -198,9 +198,6 @@
public LocatedBlock addBlock(String src, String clientName,
Block previous) throws IOException;
- public LocatedBlock addBlock(String src, String clientName,
- Block previous, DatanodeInfo[] excludedNode) throws IOException;
-
/**
* The client is done writing data to the given filename, and would
* like to complete it.
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Jan 8 14:52:46 2010
@@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
@@ -39,12 +40,11 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 17:
- * Change the block write protocol to support pipeline recovery.
- * Additional fields, like recovery flags, new GS, minBytesRcvd,
- * and maxBytesRcvd are included.
+ * Version 19:
+ * Change the block packet ack protocol to include seqno,
+ * numberOfReplies, reply0, reply1, ...
*/
- public static final int DATA_TRANSFER_VERSION = 17;
+ public static final int DATA_TRANSFER_VERSION = 19;
/** Operation */
public enum Op {
@@ -453,4 +453,98 @@
return t;
}
}
+
+ /** reply **/
+ public static class PipelineAck implements Writable {
+ private long seqno;
+ private Status replies[];
+ public final static long UNKOWN_SEQNO = -2;
+
+ /** default constructor **/
+ public PipelineAck() {
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ */
+ public PipelineAck(long seqno, Status[] replies) {
+ this.seqno = seqno;
+ this.replies = replies;
+ }
+
+ /**
+ * Get the sequence number
+ * @return the sequence number
+ */
+ public long getSeqno() {
+ return seqno;
+ }
+
+ /**
+ * Get the number of replies
+ * @return the number of replies
+ */
+ public short getNumOfReplies() {
+ return (short)replies.length;
+ }
+
+ /**
+ * get the ith reply
+ * @return the the ith reply
+ */
+ public Status getReply(int i) {
+ if (i<0 || i>=replies.length) {
+ throw new IllegalArgumentException("The input parameter " + i +
+ " should in the range of [0, " + replies.length);
+ }
+ return replies[i];
+ }
+
+ /**
+ * Check if this ack contains error status
+ * @return true if all statuses are SUCCESS
+ */
+ public boolean isSuccess() {
+ for (Status reply : replies) {
+ if (reply != Status.SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**** Writable interface ****/
+ @Override // Writable
+ public void readFields(DataInput in) throws IOException {
+ seqno = in.readLong();
+ short numOfReplies = in.readShort();
+ replies = new Status[numOfReplies];
+ for (int i=0; i<numOfReplies; i++) {
+ replies[i] = Status.read(in);
+ }
+ }
+
+ @Override // Writable
+ public void write(DataOutput out) throws IOException {
+ //WritableUtils.writeVLong(out, seqno);
+ out.writeLong(seqno);
+ out.writeShort((short)replies.length);
+ for(Status reply : replies) {
+ reply.write(out);
+ }
+ }
+
+ @Override //Object
+ public String toString() {
+ StringBuilder ack = new StringBuilder("Replies for seqno ");
+ ack.append( seqno ).append( " are" );
+ for(Status reply : replies) {
+ ack.append(" ");
+ ack.append(reply);
+ }
+ return ack.toString();
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Jan 8 14:52:46 2010
@@ -789,7 +789,6 @@
/** Default constructor */
Balancer() throws UnsupportedActionException {
- checkReplicationPolicyCompatibility(getConf());
}
/** Construct a balancer from the given configuration */
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Util.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Util.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Util.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/Util.java Fri Jan 8 14:52:46 2010
@@ -17,7 +17,19 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
public final class Util {
+ private final static Log LOG = LogFactory.getLog(Util.class.getName());
+
/**
* Current system time.
* @return current time in msec.
@@ -25,4 +37,58 @@
public static long now() {
return System.currentTimeMillis();
}
-}
\ No newline at end of file
+
+ /**
+ * Interprets the passed string as a URI. In case of error it
+ * assumes the specified string is a file.
+ *
+ * @param s the string to interpret
+ * @return the resulting URI
+ * @throws IOException
+ */
+ public static URI stringAsURI(String s) throws IOException {
+ URI u = null;
+ // try to make a URI
+ try {
+ u = new URI(s);
+ } catch (URISyntaxException e){
+ LOG.warn("Path " + s + " should be specified as a URI " +
+ "in configuration files. Please update hdfs configuration.", e);
+ }
+
+ // if URI is null or scheme is undefined, then assume it's file://
+ if(u == null || u.getScheme() == null){
+ u = fileAsURI(new File(s));
+ }
+ return u;
+ }
+
+ /**
+ * Converts the passed File to a URI.
+ *
+ * @param f the file to convert
+ * @return the resulting URI
+ * @throws IOException
+ */
+ public static URI fileAsURI(File f) throws IOException {
+ return f.getCanonicalFile().toURI();
+ }
+
+ /**
+ * Converts a collection of strings into a collection of URIs.
+ * @param names collection of strings to convert to URIs
+ * @return collection of URIs
+ */
+ public static Collection<URI> stringCollectionAsURIs(
+ Collection<String> names) {
+ Collection<URI> uris = new ArrayList<URI>(names.size());
+ for(String name : names) {
+ try {
+ uris.add(stringAsURI(name));
+ } catch (IOException e) {
+ LOG.error("Error while processing URI: " + name, e);
+ }
+ }
+ return uris;
+ }
+}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Jan 8 14:52:46 2010
@@ -35,11 +35,11 @@
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -77,6 +77,7 @@
private Checksum partialCrc = null;
private final DataNode datanode;
final private ReplicaInPipelineInterface replicaInfo;
+ volatile private boolean mirrorError;
BlockReceiver(Block block, DataInputStream in, String inAddr,
String myAddr, BlockConstructionStage stage,
@@ -217,21 +218,19 @@
/**
* While writing to mirrorOut, failure to write to mirror should not
- * affect this datanode unless a client is writing the block.
+ * affect this datanode unless it is caused by interruption.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
LOG.info(datanode.dnRegistration + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
StringUtils.stringifyException(ioe));
- mirrorOut = null;
- //
- // If stream-copy fails, continue
- // writing to disk for replication requests. For client
- // writes, return error so that the client can do error
- // recovery.
- //
- if (clientName.length() > 0) {
+ if (Thread.interrupted()) { // shut down if the thread is interrupted
throw ioe;
+ } else { // encounter an error while writing to mirror
+ // continue to run even if can not write to mirror
+ // notify client of the error
+ // and wait for the client to shut down the pipeline
+ mirrorError = true;
}
}
@@ -433,6 +432,14 @@
return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
}
+ /**
+ * Write the received packet to disk (data only)
+ */
+ private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
+ int numBytesToDisk) throws IOException {
+ out.write(pktBuf, startByteToDisk, numBytesToDisk);
+ }
+
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
@@ -461,7 +468,7 @@
}
//First write the packet to the mirror:
- if (mirrorOut != null) {
+ if (mirrorOut != null && !mirrorError) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
@@ -469,7 +476,7 @@
handleMirrorOutError(e);
}
}
-
+
buf.position(endOfHeader);
if (lastPacketInBlock || len == 0) {
@@ -525,7 +532,7 @@
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
- out.write(pktBuf, startByteToDisk, numBytesToDisk);
+ writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
@@ -560,7 +567,7 @@
throttler.throttle(len);
}
- return len;
+ return lastPacketInBlock?-1:len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -584,14 +591,15 @@
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
- replyOut, numTargets));
+ replyOut, numTargets,
+ Thread.currentThread()));
responder.start(); // start thread to processes reponses
}
/*
- * Receive until packet has zero bytes of data.
+ * Receive until the last packet.
*/
- while (receivePacket() > 0) {}
+ while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
@@ -729,13 +737,16 @@
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; // number of downstream datanodes including myself
private BlockReceiver receiver; // The owner of this responder.
+ private Thread receiverThread; // the thread that spawns this responder
public String toString() {
return "PacketResponder " + numTargets + " for Block " + this.block;
}
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
- DataOutputStream out, int numTargets) {
+ DataOutputStream out, int numTargets,
+ Thread receiverThread) {
+ this.receiverThread = receiverThread;
this.receiver = receiver;
this.block = b;
mirrorIn = in;
@@ -775,145 +786,31 @@
notifyAll();
}
- private synchronized void lastDataNodeRun() {
- long lastHeartbeat = System.currentTimeMillis();
- boolean lastPacket = false;
- final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
- while (running && datanode.shouldRun && !lastPacket) {
- long now = System.currentTimeMillis();
- try {
-
- // wait for a packet to be sent to downstream datanode
- while (running && datanode.shouldRun && ackQueue.size() == 0) {
- long idle = now - lastHeartbeat;
- long timeout = (datanode.socketTimeout/2) - idle;
- if (timeout <= 0) {
- timeout = 1000;
- }
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- if (running) {
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " Interrupted.");
- running = false;
- }
- break;
- }
-
- // send a heartbeat if it is time.
- now = System.currentTimeMillis();
- if (now - lastHeartbeat > datanode.socketTimeout/2) {
- replyOut.writeLong(-1); // send heartbeat
- replyOut.flush();
- lastHeartbeat = now;
- }
- }
-
- if (!running || !datanode.shouldRun) {
- break;
- }
- Packet pkt = ackQueue.getFirst();
- long expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " acking for packet " + expected);
-
- // If this is the last packet in block, then close block
- // file and finalize the block before responding success
- if (pkt.lastPacketInBlock) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
- lastPacket = true;
- }
-
- ackReply(expected);
- replyOut.flush();
- // remove the packet from the ack queue
- removeAckHead();
- // update the bytes acked
- if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
- }
- } catch (Exception e) {
- LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
- if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
- ioe);
- }
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
- }
- }
- }
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
- }
-
- // This method is introduced to facilitate testing. Otherwise
- // there was a little chance to bind an AspectJ advice to such a sequence
- // of calls
- private void ackReply(long expected) throws IOException {
- replyOut.writeLong(expected);
- SUCCESS.write(replyOut);
- }
-
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
-
- // If this is the last datanode in pipeline, then handle differently
- if (numTargets == 0) {
- lastDataNodeRun();
- return;
- }
-
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
boolean isInterrupted = false;
try {
- DataTransferProtocol.Status op = SUCCESS;
- boolean didRead = false;
Packet pkt = null;
long expected = -2;
- try {
- // read seqno from downstream datanode
- long seqno = mirrorIn.readLong();
- didRead = true;
- if (seqno == -1) {
- replyOut.writeLong(-1); // send keepalive
- replyOut.flush();
- LOG.debug("PacketResponder " + numTargets + " got -1");
- continue;
- } else if (seqno == -2) {
- LOG.debug("PacketResponder " + numTargets + " got -2");
- } else {
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
- seqno);
+ PipelineAck ack = new PipelineAck();
+ long seqno = PipelineAck.UNKOWN_SEQNO;
+ try {
+ if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ }
+ seqno = ack.getSeqno();
+ }
+ if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -922,17 +819,14 @@
" for block " + block +
" waiting for local datanode to finish write.");
}
- try {
- wait();
- } catch (InterruptedException e) {
- isInterrupted = true;
- throw e;
- }
+ wait();
+ }
+ if (!running || !datanode.shouldRun) {
+ break;
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
- if (seqno != expected) {
+ if (numTargets > 0 && seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
" expected seqno:" + expected +
@@ -941,11 +835,18 @@
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
- } catch (Throwable e) {
- if (running) {
+ } catch (InterruptedException ine) {
+ isInterrupted = true;
+ } catch (IOException ioe) {
+ if (Thread.interrupted()) {
+ isInterrupted = true;
+ } else {
+ // continue to run even if can not read from mirror
+ // notify client of the error
+ // and wait for the client to shut down the pipeline
+ mirrorError = true;
LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
+ " Exception " + StringUtils.stringifyException(ioe));
}
}
@@ -955,8 +856,7 @@
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
- * will detect a timout on heartbeats and will declare that
- * this datanode is bad, and rightly so.
+ * will detect that this datanode is bad, and rightly so.
*/
LOG.info("PacketResponder " + block + " " + numTargets +
" : Thread is interrupted.");
@@ -964,10 +864,6 @@
continue;
}
- if (!didRead) {
- op = ERROR;
- }
-
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
@@ -990,56 +886,39 @@
}
}
- // send my status back to upstream datanode
- ackReply(expected);
-
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded my status " +
- " for seqno " + expected);
-
- boolean success = true;
- // forward responses from downstream datanodes.
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
- try {
- if (op == SUCCESS) {
- op = Status.read(mirrorIn);
- if (op != SUCCESS) {
- success = false;
- LOG.debug("PacketResponder for block " + block +
- ": error code received from downstream " +
- " datanode[" + i + "] " + op);
- }
- }
- } catch (Throwable e) {
- op = ERROR;
- success = false;
+ // construct my ack message
+ Status[] replies = null;
+ if (mirrorError) { // ack read error
+ replies = new Status[2];
+ replies[0] = SUCCESS;
+ replies[1] = ERROR;
+ } else {
+ short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ replies = new Status[1+ackLen];
+ replies[0] = SUCCESS;
+ for (int i=0; i<ackLen; i++) {
+ replies[i+1] = ack.getReply(i);
}
- op.write(replyOut);
}
- replyOut.flush();
+ PipelineAck replyAck = new PipelineAck(expected, replies);
- LOG.debug("PacketResponder " + block + " " + numTargets +
- " responded other status " + " for seqno " + expected);
-
+ // send my ack back to upstream datanode
+ replyAck.write(replyOut);
+ replyOut.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " responded an ack: " + replyAck);
+ }
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
- if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ if (replyAck.isSuccess() &&
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
}
}
- // If we were unable to read the seqno from downstream, then stop.
- if (expected == -2) {
- running = false;
- }
- // If we forwarded an error response from a downstream datanode
- // and we are acting on behalf of a client, then we quit. The
- // client will drive the recovery mechanism.
- if (op == ERROR && receiver.clientName.length() > 0) {
- running = false;
- }
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
@@ -1051,12 +930,16 @@
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
+ if (!Thread.interrupted()) { // failure not caused by interruption
+ receiverThread.interrupt();
+ }
}
- } catch (RuntimeException e) {
+ } catch (Throwable e) {
if (running) {
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
+ receiverThread.interrupt();
}
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Jan 8 14:52:46 2010
@@ -355,12 +355,14 @@
return dfsUsage.getUsed();
}
+ /**
+ * Calculate the capacity of the filesystem, after removing any
+ * reserved capacity.
+ * @return the unreserved number of bytes left in this filesystem. May be zero.
+ */
long getCapacity() throws IOException {
- if (reserved > usage.getCapacity()) {
- return 0;
- }
-
- return usage.getCapacity()-reserved;
+ long remaining = usage.getCapacity() - reserved;
+ return remaining > 0 ? remaining : 0;
}
long getAvailable() throws IOException {
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Fri Jan 8 14:52:46 2010
@@ -37,7 +37,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Daemon;
/**
* BackupNode.
@@ -66,8 +65,6 @@
String nnHttpAddress;
/** Checkpoint manager */
Checkpointer checkpointManager;
- /** Checkpoint daemon */
- private Daemon cpDaemon;
BackupNode(Configuration conf, NamenodeRole role) throws IOException {
super(conf, role);
@@ -142,9 +139,17 @@
*/
@Override // NameNode
protected void innerClose() throws IOException {
- if(checkpointManager != null) checkpointManager.shouldRun = false;
- if(cpDaemon != null) cpDaemon.interrupt();
+ if(checkpointManager != null) {
+ // Prevent from starting a new checkpoint.
+ // Checkpoints that has already been started may proceed until
+ // the error reporting to the name-node is complete.
+ // Checkpoint manager should not be interrupted yet because it will
+ // close storage file channels and the checkpoint may fail with
+ // ClosedByInterruptException.
+ checkpointManager.shouldRun = false;
+ }
if(namenode != null && getRegistration() != null) {
+ // Exclude this node from the list of backup streams on the name-node
try {
namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
"Shutting down.");
@@ -152,7 +157,15 @@
LOG.error("Failed to report to name-node.", e);
}
}
- RPC.stopProxy(namenode); // stop the RPC threads
+ // Stop the RPC client
+ RPC.stopProxy(namenode);
+ namenode = null;
+ // Stop the checkpoint manager
+ if(checkpointManager != null) {
+ checkpointManager.interrupt();
+ checkpointManager = null;
+ }
+ // Stop name-node threads
super.innerClose();
}
@@ -243,7 +256,7 @@
this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
// get version and id info from the name-node
NamespaceInfo nsInfo = null;
- while(!stopRequested) {
+ while(!isStopRequested()) {
try {
nsInfo = handshake(namenode);
break;
@@ -262,8 +275,7 @@
*/
private void runCheckpointDaemon(Configuration conf) throws IOException {
checkpointManager = new Checkpointer(conf, this);
- cpDaemon = new Daemon(checkpointManager);
- cpDaemon.start();
+ checkpointManager.start();
}
/**
@@ -300,7 +312,7 @@
setRegistration();
NamenodeRegistration nnReg = null;
- while(!stopRequested) {
+ while(!isStopRequested()) {
try {
nnReg = namenode.register(getRegistration());
break;
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=897222&r1=897221&r2=897222&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Jan 8 14:52:46 2010
@@ -41,7 +41,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.mortbay.log.Log;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -1614,6 +1613,7 @@
NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
+ " to delete " + blockList);
}
+ pendingDeletionBlocksCount -= blocksToInvalidate.size();
return blocksToInvalidate.size();
}
}