You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/04/29 05:03:27 UTC
svn commit: r1097671 [1/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/
src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/co...
Author: todd
Date: Fri Apr 29 03:03:25 2011
New Revision: 1097671
URL: http://svn.apache.org/viewvc?rev=1097671&view=rev
Log:
Merge trunk as of r1097329 into HDFS-1073
Added:
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java
- copied unchanged from r1097628, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
- copied unchanged from r1097628, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
- copied unchanged from r1097628, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
- copied unchanged from r1097628, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java
- copied unchanged from r1097628, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java
Modified:
hadoop/hdfs/branches/HDFS-1073/ (props changed)
hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh
hadoop/hdfs/branches/HDFS-1073/build.xml (props changed)
hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml
hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/java/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (props changed)
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored
hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/HDFS-1073/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-265:796829-820463
/hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:1086482-1095244
+/hadoop/hdfs/trunk:1086482-1097628
Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.txt?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.txt Fri Apr 29 03:03:25 2011
@@ -34,6 +34,8 @@ Trunk (unreleased changes)
HDFS-1070. Speedup namenode image loading and saving by storing only
local file names. (hairong)
+
+ HDFS-1751. Intrinsic limits for HDFS files, directories (daryn via boryas).
IMPROVEMENTS
@@ -121,6 +123,29 @@ Trunk (unreleased changes)
HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
also HADOOP-7230. (Daryn Sharp via szetszwo)
+ HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+ being written are closed for a grace period, and start a new thread when
+ new files are opened for write. (szetszwo)
+
+ HDFS-1854. make failure message more useful in
+ DFSTestUtil.waitReplication(). (Matt Foley via eli)
+
+ HDFS-1562. Add rack policy tests. (eli)
+
+ HDFS-1856. TestDatanodeBlockScanner waits forever, errs without giving
+ information. (Matt Foley via eli)
+
+ HDFS-1295. Improve namenode restart times by short-circuiting the
+ first block reports from datanodes. (Matt Foley via suresh)
+
+ HDFS-1843. Discover file not found early for file append.
+ (Bharath Mundlapudi via jitendra)
+
+ HDFS-1862. Improve test reliability of HDFS-1594. (Aaron T. Myers via eli)
+
+ HDFS-1846. Preallocate edit log with OP_INVALID instead of zero bytes
+ to ensure blocks are actually allocated. (Aaron T. Myers via todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -193,6 +218,19 @@ Trunk (unreleased changes)
HDFS-1831. Fix append bug in FileContext and implement CreateFlag
check (related to HADOOP-7223). (suresh)
+ HDFS-1594. When the disk becomes full Namenode is getting shutdown and
+ not able to recover. (Aaron T. Myers via eli)
+
+ HDFS-1822. Handle editlog opcode conflict with 0.20.203 during upgrade,
+ by throwing an error to indicate the editlog needs to be empty.
+ (suresh)
+
+ HDFS-1808. TestBalancer waits forever, errs without giving information.
+ (Matt Foley via eli)
+
+ HDFS-1829. TestNodeCount waits forever, errs without giving information.
+ (Matt Foley via eli)
+
Release 0.22.0 - Unreleased
NEW FEATURES
@@ -414,6 +452,9 @@ Release 0.22.0 - Unreleased
HDFS-1582. Remove auto-generated native build files. (rvs via eli)
+ HDFS-1861. Rename dfs.datanode.max.xcievers and bump its default value.
+ (eli)
+
OPTIMIZATIONS
HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -657,6 +698,14 @@ Release 0.22.0 - Unreleased
HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
and likely to fail on fast servers. (Matt Foley via eli)
+ HDFS-1845. Symlink comes up as directory after namenode restart.
+ (John George via eli)
+
+ HDFS-1666. Disable failing hdfsproxy test TestAuthorizationFilter (todd)
+
+ HDFS-1823. start-dfs.sh script fails if HADOOP_HOME is not set.
+ (tomwhite via eli)
+
Release 0.21.1 - Unreleased
IMPROVEMENTS
@@ -712,6 +761,9 @@ Release 0.21.1 - Unreleased
HDFS-1786. Some cli test cases expect a "null" message
(Uma Maheswara Rao G via todd)
+ HDFS-1855. TestDatanodeBlockScanner.testBlockCorruptionRecoveryPolicy()
+ part 2 fails in two different ways. (Matt Foley via eli)
+
Release 0.21.0 - 2010-08-13
INCOMPATIBLE CHANGES
Modified: hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh (original)
+++ hadoop/hdfs/branches/HDFS-1073/bin/hdfs-config.sh Fri Apr 29 03:03:25 2011
@@ -27,6 +27,8 @@ if [ -d "${HADOOP_COMMON_HOME}" ]; then
. "$HADOOP_COMMON_HOME"/bin/hadoop-config.sh
elif [ -d "${HADOOP_HOME}" ]; then
. "$HADOOP_HOME"/bin/hadoop-config.sh
+elif [ -e "${HADOOP_HDFS_HOME}"/bin/hadoop-config.sh ]; then
+ . "$HADOOP_HDFS_HOME"/bin/hadoop-config.sh
else
echo "Hadoop common not found."
exit
Propchange: hadoop/hdfs/branches/HDFS-1073/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/build.xml:779102
/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
/hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:1086482-1095244
+/hadoop/hdfs/trunk/build.xml:1086482-1097628
Propchange: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1095244
+/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1097628
Modified: hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/contrib/build.xml Fri Apr 29 03:03:25 2011
@@ -46,9 +46,11 @@
<!-- Test all the contribs. -->
<!-- ====================================================== -->
<target name="test">
+ <!-- hdfsproxy tests failing due to HDFS-1666
<subant target="test">
<fileset dir="." includes="hdfsproxy/build.xml"/>
</subant>
+ -->
</target>
Propchange: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1095244
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1097628
Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:1086482-1095244
+/hadoop/hdfs/trunk/src/java:1086482-1097628
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml Fri Apr 29 03:03:25 2011
@@ -195,6 +195,20 @@ creations/deletions), or "all".</descrip
</property>
<property>
+ <name>dfs.namenode.fs-limits.max-component-length</name>
+ <value>0</value>
+ <description>Defines the maximum number of characters in each component
+ of a path. A value of 0 will disable the check.</description>
+</property>
+
+<property>
+ <name>dfs.namenode.fs-limits.max-directory-items</name>
+ <value>0</value>
+ <description>Defines the maximum number of items that a directory may
+ contain. A value of 0 will disable the check.</description>
+</property>
+
+<property>
<name>dfs.namenode.edits.dir</name>
<value>${dfs.namenode.name.dir}</value>
<description>Determines where on the local filesystem the DFS name node
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Apr 29 03:03:25 2011
@@ -45,6 +45,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -128,7 +129,6 @@ public class DFSClient implements FSCons
private volatile long serverDefaultsLastUpdate;
static Random r = new Random();
final String clientName;
- final LeaseChecker leasechecker = new LeaseChecker();
Configuration conf;
long defaultBlockSize;
private short defaultReplication;
@@ -138,6 +138,7 @@ public class DFSClient implements FSCons
final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
+ final LeaseChecker leasechecker;
/**
* The locking hierarchy is to first acquire lock on DFSClient object, followed by
@@ -253,6 +254,7 @@ public class DFSClient implements FSCons
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
+ this.leasechecker = new LeaseChecker(hdfsTimeout);
this.ugi = UserGroupInformation.getCurrentUser();
@@ -753,10 +755,14 @@ public class DFSClient implements FSCons
*
* @see ClientProtocol#append(String, String)
*/
- OutputStream append(String src, int buffersize, Progressable progress)
+ OutputStream append(String src, int buffersize, Progressable progress)
throws IOException {
checkOpen();
HdfsFileStatus stat = getFileInfo(src);
+ if (stat == null) { // No file found
+ throw new FileNotFoundException("failed to append to non-existent file "
+ + src + " on client " + clientName);
+ }
OutputStream result = callAppend(stat, src, buffersize, progress);
leasechecker.put(src, result);
return result;
@@ -1358,38 +1364,106 @@ public class DFSClient implements FSCons
}
}
- boolean isLeaseCheckerStarted() {
- return leasechecker.daemon != null;
- }
-
/** Lease management*/
- class LeaseChecker implements Runnable {
+ class LeaseChecker {
+ static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+ static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
/** A map from src -> DFSOutputStream of files that are currently being
* written by this client.
*/
private final SortedMap<String, OutputStream> pendingCreates
= new TreeMap<String, OutputStream>();
+ /** The time in milliseconds that the map became empty. */
+ private long emptyTime = Long.MAX_VALUE;
+ /** A fixed lease renewal time period in milliseconds */
+ private final long renewal;
+ /** A daemon for renewing lease */
private Daemon daemon = null;
-
+ /** Only the daemon with currentId should run. */
+ private int currentId = 0;
+
+ /**
+ * A period in milliseconds that the lease renewer thread should run
+ * after the map became empty.
+ * If the map is empty for a time period longer than the grace period,
+ * the renewer should terminate.
+ */
+ private long gracePeriod;
+ /**
+ * The time period in milliseconds
+ * that the renewer sleeps for each iteration.
+ */
+ private volatile long sleepPeriod;
+
+ private LeaseChecker(final long timeout) {
+ this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)?
+ timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+ setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+ }
+
+ /** Set the grace period and adjust the sleep period accordingly. */
+ void setGraceSleepPeriod(final long gracePeriod) {
+ if (gracePeriod < 100L) {
+ throw new HadoopIllegalArgumentException(gracePeriod
+ + " = gracePeriod < 100ms is too small.");
+ }
+ synchronized(this) {
+ this.gracePeriod = gracePeriod;
+ }
+ final long half = gracePeriod/2;
+ this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+ half: LEASE_RENEWER_SLEEP_DEFAULT;
+ }
+
+ /** Is the daemon running? */
+ synchronized boolean isRunning() {
+ return daemon != null && daemon.isAlive();
+ }
+
+ /** Is the empty period longer than the grace period? */
+ private synchronized boolean isRenewerExpired() {
+ return emptyTime != Long.MAX_VALUE
+ && System.currentTimeMillis() - emptyTime > gracePeriod;
+ }
+
synchronized void put(String src, OutputStream out) {
if (clientRunning) {
- if (daemon == null) {
- daemon = new Daemon(this);
+ if (daemon == null || isRenewerExpired()) {
+ //start a new deamon with a new id.
+ final int id = ++currentId;
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LeaseChecker.this.run(id);
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+ + " is interrupted.", e);
+ }
+ }
+ }
+ });
daemon.start();
}
pendingCreates.put(src, out);
+ emptyTime = Long.MAX_VALUE;
}
}
synchronized void remove(String src) {
pendingCreates.remove(src);
+ if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+ //discover the first time that the map is empty.
+ emptyTime = System.currentTimeMillis();
+ }
}
void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
- if (daemon != null) {
+ if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
@@ -1456,37 +1530,30 @@ public class DFSClient implements FSCons
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
*/
- public void run() {
- long lastRenewed = 0;
- int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
- if (hdfsTimeout > 0) {
- renewal = Math.min(renewal, hdfsTimeout/2);
- }
- while (clientRunning && !Thread.interrupted()) {
- if (System.currentTimeMillis() - lastRenewed > renewal) {
+ private void run(final int id) throws InterruptedException {
+ for(long lastRenewed = System.currentTimeMillis();
+ clientRunning && !Thread.interrupted();
+ Thread.sleep(sleepPeriod)) {
+ if (System.currentTimeMillis() - lastRenewed >= renewal) {
try {
renew();
lastRenewed = System.currentTimeMillis();
} catch (SocketTimeoutException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Shutting down HDFS client...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Aborting ...", ie);
abort();
break;
} catch (IOException ie) {
- LOG.warn("Problem renewing lease for " + clientName +
- " for a period of " + (hdfsTimeout/1000) +
- " seconds. Will retry shortly...", ie);
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (renewal/1000) + " seconds. Will retry shortly ...", ie);
}
}
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " is interrupted.", ie);
+ synchronized(this) {
+ if (id != currentId || isRenewerExpired()) {
+ //no longer the current daemon or expired
+ return;
}
- return;
}
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Apr 29 03:03:25 2011
@@ -120,6 +120,12 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY = "dfs.namenode.delegation.token.max-lifetime";
public static final long DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days
+ //Filesystem limit keys
+ public static final String DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY = "dfs.namenode.fs-limits.max-component-length";
+ public static final int DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT = 0; // no limit
+ public static final String DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY = "dfs.namenode.fs-limits.max-directory-items";
+ public static final int DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT = 0; // no limit
+
//Following keys have no defaults
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
public static final String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
@@ -166,8 +172,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3;
public static final String DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";
public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50075";
- public static final String DFS_DATANODE_MAX_XCIEVERS_KEY = "dfs.datanode.max.xcievers";
- public static final int DFS_DATANODE_MAX_XCIEVERS_DEFAULT = 256;
+ public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads";
+ public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
public static final String DFS_DATANODE_NUMBLOCKS_KEY = "dfs.datanode.numblocks";
public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
@@ -254,4 +260,9 @@ public class DFSConfigKeys extends Commo
public static final String DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+ public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
+ public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
+ public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
+ public static final long DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
+ public static final String DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes";
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Fri Apr 29 03:03:25 2011
@@ -84,5 +84,6 @@ public class HdfsConfiguration extends C
deprecate("dfs.permissions.supergroup", DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY);
deprecate("dfs.write.packet.size", DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY);
deprecate("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY);
+ deprecate("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY);
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Apr 29 03:03:25 2011
@@ -88,7 +88,11 @@ public interface FSConstants {
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -31;
+ public static final int LAYOUT_VERSION = -34;
// Current version:
- // -31: add persistent transaction ids
+ // -31, -32 and -33 are reserved for 0.20.203, 0.20.204 and 0.22.
+ // -34: persistent transaction IDs
+
+ // Record of version numbers for specific changes:
+ public static final int FIRST_TXNID_BASED_LAYOUT_VERSION = -35;
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Apr 29 03:03:25 2011
@@ -78,6 +78,9 @@ public abstract class Storage extends St
// last layout version that did not support persistent rbw replicas
public static final int PRE_RBW_LAYOUT_VERSION = -19;
+ /** Layout versions of 0.20.203 release */
+ public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
+
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
@@ -871,4 +874,13 @@ public abstract class Storage extends St
+ "-" + Integer.toString(storage.getLayoutVersion())
+ "-" + Long.toString(storage.getCTime());
}
+
+ public static boolean is203LayoutVersion(int layoutVersion) {
+ for (int lv203 : LAYOUT_VERSIONS_203) {
+ if (lv203 == layoutVersion) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Apr 29 03:03:25 2011
@@ -1131,23 +1131,27 @@ public class DataNode extends Configured
* @throws IOException
*/
private DatanodeCommand blockReport() throws IOException {
- // send block report
+ // send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest block report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
+
+ // Create block report
+ long brCreateStartTime = now();
BlockListAsLongs bReport = data.getBlockReport();
+ // Send block report
+ long brSendStartTime = now();
cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
- long brTime = now() - brStartTime;
- myMetrics.blockReports.inc(brTime);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
- " blocks got processed in " + brTime + " msecs");
+
+ // Log the block report processing stats from Datanode perspective
+ long brSendCost = now() - brSendStartTime;
+ long brCreateCost = brSendStartTime - brCreateStartTime;
+ myMetrics.blockReports.inc(brSendCost);
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ + " blocks took " + brCreateCost + " msec to generate and "
+ + brSendCost + " msecs for RPC and NN processing");
+
//
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Fri Apr 29 03:03:25 2011
@@ -56,7 +56,8 @@ class DataXceiverServer implements Runna
* Enforcing the limit is required in order to avoid data-node
* running out of memory.
*/
- int maxXceiverCount = DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_DEFAULT;
+ int maxXceiverCount =
+ DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
/** A manager to make sure that cluster balancing does not
* take too much resources.
@@ -115,8 +116,8 @@ class DataXceiverServer implements Runna
this.datanode = datanode;
this.maxXceiverCount =
- conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_KEY,
- DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_DEFAULT);
+ conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
this.estimateBlockSize =
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -4,4 +4,4 @@
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1095244
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1097628
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Apr 29 03:03:25 2011
@@ -36,13 +36,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -763,8 +763,8 @@ public class BlockManager {
}
// Go through all blocks that need replications.
- BlockIterator neededReplicationsIterator = neededReplications
- .iterator();
+ UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
+ neededReplications.iterator();
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
@@ -1053,28 +1053,59 @@ public class BlockManager {
*/
}
}
+
+ /**
+ * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+ * updates to the information about under-construction blocks.
+ * Besides the block in question, it provides the ReplicaState
+ * reported by the datanode in the block report.
+ */
+ private static class StatefulBlockInfo {
+ final BlockInfoUnderConstruction storedBlock;
+ final ReplicaState reportedState;
+
+ StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
+ ReplicaState reportedState) {
+ this.storedBlock = storedBlock;
+ this.reportedState = reportedState;
+ }
+ }
/**
* The given node is reporting all its blocks. Use this info to
- * update the (machine-->blocklist) and (block-->machinelist) tables.
+ * update the (datanode-->blocklist) and (block-->nodelist) tables.
*/
- public void processReport(DatanodeDescriptor node,
- BlockListAsLongs report) throws IOException {
- //
+ public void processReport(DatanodeDescriptor node, BlockListAsLongs report)
+ throws IOException {
+
+ boolean isFirstBlockReport = (node.numBlocks() == 0);
+ if (isFirstBlockReport) {
+ // Initial block reports can be processed a lot more efficiently than
+ // ordinary block reports. This shortens NN restart times.
+ processFirstBlockReport(node, report);
+ return;
+ }
+
+ // Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
- Collection<Block> toAdd = new LinkedList<Block>();
+ Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
- node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
+ Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+ // Process the blocks on each queue
+ for (StatefulBlockInfo b : toUC) {
+ addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+ }
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
- for (Block b : toAdd) {
- addStoredBlock(b, node, null);
+ for (BlockInfo b : toAdd) {
+ addStoredBlock(b, node, null, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
@@ -1088,16 +1119,287 @@ public class BlockManager {
}
/**
+ * processFirstBlockReport is intended only for processing "initial" block
+ * reports, the first block report received from a DN after it registers.
+ * It just adds all the valid replicas to the datanode, without calculating
+ * a toRemove list (since there won't be any). It also silently discards
+ * any invalid blocks, thereby deferring their processing until
+ * the next block report.
+ * @param node - DatanodeDescriptor of the node that sent the report
+ * @param report - the initial block report, to be processed
+ * @throws IOException
+ */
+ void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report)
+ throws IOException {
+ if (report == null) return;
+ assert (namesystem.hasWriteLock());
+ assert (node.numBlocks() == 0);
+ BlockReportIterator itBR = report.getBlockReportIterator();
+
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState reportedState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+ // If block does not belong to any file, we are done.
+ if (storedBlock == null) continue;
+
+ // If block is corrupt, mark it and continue to next block.
+ BlockUCState ucState = storedBlock.getBlockUCState();
+ if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
+ markBlockAsCorrupt(storedBlock, node);
+ continue;
+ }
+
+ // If block is under construction, add this replica to its list
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+ node, iblk, reportedState);
+ //and fall through to next clause
+ }
+ //add replica if appropriate
+ if (reportedState == ReplicaState.FINALIZED) {
+ addStoredBlockImmediate(storedBlock, node);
+ }
+ }
+ }
+
+ void reportDiff(DatanodeDescriptor dn,
+ BlockListAsLongs newReport,
+ Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toRemove, // remove from DatanodeDescriptor
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockInfo> toCorrupt, // add to corrupt replicas list
+ Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+ // place a delimiter in the list which separates blocks
+ // that have been reported from those that have not
+ BlockInfo delimiter = new BlockInfo(new Block(), 1);
+ boolean added = dn.addBlock(delimiter);
+ assert added : "Delimiting block cannot be present in the node";
+ if(newReport == null)
+ newReport = new BlockListAsLongs();
+ // scan the report and process newly reported blocks
+ BlockReportIterator itBR = newReport.getBlockReportIterator();
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+ toAdd, toInvalidate, toCorrupt, toUC);
+ // move block to the head of the list
+ if(storedBlock != null && storedBlock.findDatanode(dn) >= 0)
+ dn.moveBlockToHead(storedBlock);
+ }
+ // collect blocks that have not been reported
+ // all of them are next to the delimiter
+ Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+ delimiter.getNext(0), dn);
+ while(it.hasNext())
+ toRemove.add(it.next());
+ dn.removeBlock(delimiter);
+ }
+
+ /**
+ * Process a block replica reported by the data-node.
+ * No side effects except adding to the passed-in Collections.
+ *
+ * <ol>
+ * <li>If the block is not known to the system (not in blocksMap) then the
+ * data-node should be notified to invalidate this block.</li>
+ * <li>If the reported replica is valid that is has the same generation stamp
+ * and length as recorded on the name-node, then the replica location should
+ * be added to the name-node.</li>
+ * <li>If the reported replica is not valid, then it is marked as corrupt,
+ * which triggers replication of the existing valid replicas.
+ * Corrupt replicas are removed from the system when the block
+ * is fully replicated.</li>
+ * <li>If the reported replica is for a block currently marked "under
+ * construction" in the NN, then it should be added to the
+ * BlockInfoUnderConstruction's list of replicas.</li>
+ * </ol>
+ *
+ * @param dn descriptor for the datanode that made the report
+ * @param block reported block replica
+ * @param reportedState reported replica state
+ * @param toAdd add to DatanodeDescriptor
+ * @param toInvalidate missing blocks (not in the blocks map)
+ * should be removed from the data-node
+ * @param toCorrupt replicas with unexpected length or generation stamp;
+ * add to corrupt replicas
+ * @param toUC replicas of blocks currently under construction
+ * @return
+ */
+ BlockInfo processReportedBlock(DatanodeDescriptor dn,
+ Block block, ReplicaState reportedState,
+ Collection<BlockInfo> toAdd,
+ Collection<Block> toInvalidate,
+ Collection<BlockInfo> toCorrupt,
+ Collection<StatefulBlockInfo> toUC) {
+
+ if(FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug("Reported block " + block
+ + " on " + dn.getName() + " size " + block.getNumBytes()
+ + " replicaState = " + reportedState);
+ }
+
+ // find block by blockId
+ BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ if(storedBlock == null) {
+ // If blocksMap does not contain reported block id,
+ // the replica should be removed from the data-node.
+ toInvalidate.add(new Block(block));
+ return null;
+ }
+ BlockUCState ucState = storedBlock.getBlockUCState();
+
+ // Block is on the NN
+ if(FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug("In memory blockUCState = " + ucState);
+ }
+
+ // Ignore replicas already scheduled to be removed from the DN
+ if(belongsToInvalidates(dn.getStorageID(), block)) {
+ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ + " in recentInvalidatesSet should not appear in DN " + this;
+ return storedBlock;
+ }
+
+ if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
+ toCorrupt.add(storedBlock);
+ return storedBlock;
+ }
+
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ toUC.add(new StatefulBlockInfo(
+ (BlockInfoUnderConstruction)storedBlock, reportedState));
+ return storedBlock;
+ }
+
+ //add replica if appropriate
+ if (reportedState == ReplicaState.FINALIZED
+ && storedBlock.findDatanode(dn) < 0) {
+ toAdd.add(storedBlock);
+ }
+ return storedBlock;
+ }
+
+ /*
+ * The next two methods test the various cases under which we must conclude
+ * the replica is corrupt, or under construction. These are laid out
+ * as switch statements, on the theory that it is easier to understand
+ * the combinatorics of reportedState and ucState that way. It should be
+ * at least as efficient as boolean expressions.
+ */
+ private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState,
+ BlockInfo storedBlock, BlockUCState ucState,
+ DatanodeDescriptor dn) {
+ switch(reportedState) {
+ case FINALIZED:
+ switch(ucState) {
+ case COMPLETE:
+ case COMMITTED:
+ return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
+ || storedBlock.getNumBytes() != iblk.getNumBytes());
+ default:
+ return false;
+ }
+ case RBW:
+ case RWR:
+ return storedBlock.isComplete();
+ case RUR: // should not be reported
+ case TEMPORARY: // should not be reported
+ default:
+ FSNamesystem.LOG.warn("Unexpected replica state " + reportedState
+ + " for block: " + storedBlock +
+ " on " + dn.getName() + " size " + storedBlock.getNumBytes());
+ return true;
+ }
+ }
+
+ private boolean isBlockUnderConstruction(BlockInfo storedBlock,
+ BlockUCState ucState, ReplicaState reportedState) {
+ switch(reportedState) {
+ case FINALIZED:
+ switch(ucState) {
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ return true;
+ default:
+ return false;
+ }
+ case RBW:
+ case RWR:
+ return (!storedBlock.isComplete());
+ case RUR: // should not be reported
+ case TEMPORARY: // should not be reported
+ default:
+ return false;
+ }
+ }
+
+ void addStoredBlockUnderConstruction(
+ BlockInfoUnderConstruction block,
+ DatanodeDescriptor node,
+ ReplicaState reportedState)
+ throws IOException {
+ block.addReplicaIfNotPresent(node, block, reportedState);
+ if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
+ addStoredBlock(block, node, null, true);
+ }
+ }
+
+ /**
+ * Faster version of {@link addStoredBlock()}, intended for use with
+ * initial block report at startup. If not in startup safe mode, will
+ * call standard addStoredBlock().
+ * Assumes this method is called "immediately" so there is no need to
+ * refresh the storedBlock from blocksMap.
+ * Doesn't handle underReplication/overReplication, or worry about
+ * pendingReplications or corruptReplicas, because it's in startup safe mode.
+ * Doesn't log every block, because there are typically millions of them.
+ * @throws IOException
+ */
+ private void addStoredBlockImmediate(BlockInfo storedBlock,
+ DatanodeDescriptor node)
+ throws IOException {
+ assert (storedBlock != null && namesystem.hasWriteLock());
+ if (!namesystem.isInStartupSafeMode()
+ || namesystem.isPopulatingReplQueues()) {
+ addStoredBlock(storedBlock, node, null, false);
+ return;
+ }
+
+ // just add it
+ node.addBlock(storedBlock);
+
+ // Now check for completion of blocks and safe block count
+ int numCurrentReplica = countLiveNodes(storedBlock);
+ if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
+ && numCurrentReplica >= minReplication)
+ storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
+
+ // check whether safe replication is reached for the block
+ // only complete blocks are counted towards that
+ if(storedBlock.isComplete())
+ namesystem.incrementSafeBlockCount(numCurrentReplica);
+ }
+
+ /**
* Modify (block-->datanode) map. Remove block from set of
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
- private Block addStoredBlock(final Block block,
+ private Block addStoredBlock(final BlockInfo block,
DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint)
+ DatanodeDescriptor delNodeHint,
+ boolean logEveryBlock)
throws IOException {
- assert (namesystem.hasWriteLock());
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ assert (block != null && namesystem.hasWriteLock());
+ BlockInfo storedBlock;
+ if (block.getClass() == BlockInfoUnderConstruction.class) {
+ //refresh our copy in case the block got completed in another thread
+ storedBlock = blocksMap.getStoredBlock(block);
+ } else {
+ storedBlock = block;
+ }
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
@@ -1113,29 +1415,25 @@ public class BlockManager {
INodeFile fileINode = storedBlock.getINode();
assert fileINode != null : "Block must belong to a file";
- // add block to the data-node
+ // add block to the datanode
boolean added = node.addBlock(storedBlock);
- int curReplicaDelta = 0;
+ int curReplicaDelta;
if (added) {
curReplicaDelta = 1;
- //
- // At startup time, because too many new blocks come in
- // they take up lots of space in the log file.
- // So, we log only when namenode is out of safemode.
- //
- if (!namesystem.isInSafeMode()) {
+ if (logEveryBlock) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+ "blockMap updated: " + node.getName() + " is added to " +
storedBlock + " size " + storedBlock.getNumBytes());
}
} else {
+ curReplicaDelta = 0;
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
+ " on " + node.getName() + " size " + storedBlock.getNumBytes());
}
- // filter out containingNodes that are marked for decommission.
+ // Now check for completion of blocks and safe block count
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
@@ -1147,18 +1445,19 @@ public class BlockManager {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
+ // Is no-op if not in safe mode.
if(storedBlock.isComplete())
namesystem.incrementSafeBlockCount(numCurrentReplica);
- // if file is under construction, then check whether the block
- // can be completed
+ // if file is under construction, then done for now
if (fileINode.isUnderConstruction()) {
return storedBlock;
}
- // do not handle mis-replicated blocks during start up
- if (!namesystem.isPopulatingReplQueues())
+ // do not try to handle over/under-replicated blocks during safe mode
+ if (!namesystem.isPopulatingReplQueues()) {
return storedBlock;
+ }
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
@@ -1393,18 +1692,22 @@ public class BlockManager {
pendingReplications.remove(block);
// blockReceived reports a finalized block
- Collection<Block> toAdd = new LinkedList<Block>();
+ Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
- node.processReportedBlock(this, block, ReplicaState.FINALIZED,
- toAdd, toInvalidate, toCorrupt);
- // the block is only in one of the lists
+ Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ processReportedBlock(node, block, ReplicaState.FINALIZED,
+ toAdd, toInvalidate, toCorrupt, toUC);
+ // the block is only in one of the to-do lists
// if it is in none then data-node already has it
- assert toAdd.size() + toInvalidate.size() <= 1 :
- "The block should be only in one of the lists.";
+ assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
+ : "The block should be only in one of the lists.";
- for (Block b : toAdd) {
- addStoredBlock(b, node, delHintNode);
+ for (StatefulBlockInfo b : toUC) {
+ addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+ }
+ for (BlockInfo b : toAdd) {
+ addStoredBlock(b, node, delHintNode, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
@@ -1446,6 +1749,32 @@ public class BlockManager {
return new NumberReplicas(live, count, corrupt, excess);
}
+ /**
+ * Simpler, faster form of {@link countNodes()} that only returns the number
+ * of live nodes. If in startup safemode (or its 30-sec extension period),
+ * then it gains speed by ignoring issues of excess replicas or nodes
+ * that are decommissioned or in process of becoming decommissioned.
+ * If not in startup, then it calls {@link countNodes()} instead.
+ *
+ * @param b - the block being tested
+ * @return count of live nodes for this block
+ */
+ int countLiveNodes(BlockInfo b) {
+ if (!namesystem.isInStartupSafeMode()) {
+ return countNodes(b).liveReplicas();
+ }
+ // else proceed with fast case
+ int live = 0;
+ Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
+ Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
+ while (nodeIter.hasNext()) {
+ DatanodeDescriptor node = nodeIter.next();
+ if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
+ live++;
+ }
+ return live;
+ }
+
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
NumberReplicas num) {
int curReplicas = num.liveReplicas();
@@ -1781,7 +2110,7 @@ public class BlockManager {
/**
* Return an iterator over the set of blocks for which there are no replicas.
*/
- BlockIterator getCorruptReplicaBlockIterator() {
+ UnderReplicatedBlocks.BlockIterator getCorruptReplicaBlockIterator() {
return neededReplications
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Apr 29 03:03:25 2011
@@ -24,11 +24,8 @@ import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -278,7 +275,7 @@ public class DatanodeDescriptor extends
/**
* Iterates over the list of blocks belonging to the datanode.
*/
- static private class BlockIterator implements Iterator<BlockInfo> {
+ static class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
private DatanodeDescriptor node;
@@ -421,141 +418,6 @@ public class DatanodeDescriptor extends
return blockarray;
}
- void reportDiff(BlockManager blockManager,
- BlockListAsLongs newReport,
- Collection<Block> toAdd, // add to DatanodeDescriptor
- Collection<Block> toRemove, // remove from DatanodeDescriptor
- Collection<Block> toInvalidate, // should be removed from DN
- Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
- // place a delimiter in the list which separates blocks
- // that have been reported from those that have not
- BlockInfo delimiter = new BlockInfo(new Block(), 1);
- boolean added = this.addBlock(delimiter);
- assert added : "Delimiting block cannot be present in the node";
- if(newReport == null)
- newReport = new BlockListAsLongs();
- // scan the report and process newly reported blocks
- BlockReportIterator itBR = newReport.getBlockReportIterator();
- while(itBR.hasNext()) {
- Block iblk = itBR.next();
- ReplicaState iState = itBR.getCurrentReplicaState();
- BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
- toAdd, toInvalidate, toCorrupt);
- // move block to the head of the list
- if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
- this.moveBlockToHead(storedBlock);
- }
- // collect blocks that have not been reported
- // all of them are next to the delimiter
- Iterator<? extends Block> it = new BlockIterator(delimiter.getNext(0),this);
- while(it.hasNext())
- toRemove.add(it.next());
- this.removeBlock(delimiter);
- }
-
- /**
- * Process a block replica reported by the data-node.
- *
- * <ol>
- * <li>If the block is not known to the system (not in blocksMap) then the
- * data-node should be notified to invalidate this block.</li>
- * <li>If the reported replica is valid that is has the same generation stamp
- * and length as recorded on the name-node, then the replica location is
- * added to the name-node.</li>
- * <li>If the reported replica is not valid, then it is marked as corrupt,
- * which triggers replication of the existing valid replicas.
- * Corrupt replicas are removed from the system when the block
- * is fully replicated.</li>
- * </ol>
- *
- * @param blockManager
- * @param block reported block replica
- * @param rState reported replica state
- * @param toAdd add to DatanodeDescriptor
- * @param toInvalidate missing blocks (not in the blocks map)
- * should be removed from the data-node
- * @param toCorrupt replicas with unexpected length or generation stamp;
- * add to corrupt replicas
- * @return
- */
- BlockInfo processReportedBlock(
- BlockManager blockManager,
- Block block, // reported block replica
- ReplicaState rState, // reported replica state
- Collection<Block> toAdd, // add to DatanodeDescriptor
- Collection<Block> toInvalidate, // should be removed from DN
- Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
- if(FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug("Reported block " + block
- + " on " + getName() + " size " + block.getNumBytes()
- + " replicaState = " + rState);
- }
-
- // find block by blockId
- BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
- if(storedBlock == null) {
- // If blocksMap does not contain reported block id,
- // the replica should be removed from the data-node.
- toInvalidate.add(new Block(block));
- return null;
- }
-
- if(FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug("In memory blockUCState = " +
- storedBlock.getBlockUCState());
- }
-
- // Ignore replicas already scheduled to be removed from the DN
- if(blockManager.belongsToInvalidates(getStorageID(), block)) {
- assert storedBlock.findDatanode(this) < 0 : "Block " + block
- + " in recentInvalidatesSet should not appear in DN " + this;
- return storedBlock;
- }
-
- // Block is on the DN
- boolean isCorrupt = false;
- switch(rState) {
- case FINALIZED:
- switch(storedBlock.getBlockUCState()) {
- case COMPLETE:
- case COMMITTED:
- if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
- || storedBlock.getNumBytes() != block.getNumBytes())
- isCorrupt = true;
- break;
- case UNDER_CONSTRUCTION:
- case UNDER_RECOVERY:
- ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- this, block, rState);
- }
- if(!isCorrupt && storedBlock.findDatanode(this) < 0)
- if (storedBlock.getNumBytes() != block.getNumBytes()) {
- toAdd.add(new Block(block));
- } else {
- toAdd.add(storedBlock);
- }
- break;
- case RBW:
- case RWR:
- if(!storedBlock.isComplete())
- ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- this, block, rState);
- else
- isCorrupt = true;
- break;
- case RUR: // should not be reported
- case TEMPORARY: // should not be reported
- default:
- FSNamesystem.LOG.warn("Unexpected replica state " + rState
- + " for block: " + storedBlock +
- " on " + getName() + " size " + storedBlock.getNumBytes());
- break;
- }
- if(isCorrupt)
- toCorrupt.add(storedBlock);
- return storedBlock;
- }
-
/** Serialization for FSEditLog */
void readFieldsFromFSEditLog(DataInput in) throws IOException {
this.name = DeprecatedUTF8.readString(in);
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Apr 29 03:03:25 2011
@@ -43,7 +43,14 @@ class EditLogFileOutputStream extends Ed
private DataOutputBuffer bufCurrent; // current buffer for writing
private DataOutputBuffer bufReady; // buffer ready for flushing
final private int initBufferSize; // inital buffer size
- static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation
+ static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
+
+ static {
+ fill.position(0);
+ for (int i = 0; i < fill.capacity(); i++) {
+ fill.put(FSEditLogOpCodes.OP_INVALID.getOpCode());
+ }
+ }
/**
* Creates output buffers and file object.
@@ -190,12 +197,11 @@ class EditLogFileOutputStream extends Ed
FSNamesystem.LOG.debug("Preallocating Edit log, current size "
+ fc.size());
}
- long newsize = position + 1024 * 1024; // 1MB
fill.position(0);
- int written = fc.write(fill, newsize);
+ int written = fc.write(fill, position);
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Edit log size is now " + fc.size() +
- " written " + written + " bytes " + " at offset " + newsize);
+ " written " + written + " bytes " + " at offset " + position);
}
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Apr 29 03:03:25 2011
@@ -42,6 +42,8 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSLimitException;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.*;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -68,6 +70,8 @@ class FSDirectory implements Closeable {
FSImage fsImage;
private volatile boolean ready = false;
private static final long UNKNOWN_DISK_SPACE = -1;
+ private final int maxComponentLength;
+ private final int maxDirItems;
private final int lsLimit; // max list limit
// lock to protect BlockMap.
@@ -119,6 +123,14 @@ class FSDirectory implements Closeable {
this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
+ // filesystem limits
+ this.maxComponentLength = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
+ this.maxDirItems = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
+
int threshold = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
@@ -158,7 +170,7 @@ class FSDirectory implements Closeable {
}
writeLock();
try {
- this.ready = true;
+ setReady(true);
this.nameCache.initialized();
cond.signalAll();
} finally {
@@ -166,6 +178,11 @@ class FSDirectory implements Closeable {
}
}
+ // exposed for unit tests
+ protected void setReady(boolean flag) {
+ ready = flag;
+ }
+
private void incrDeletedFileCount(int count) {
if (getFSNamesystem() != null)
NameNode.getNameNodeMetrics().numFilesDeleted.inc(count);
@@ -1613,12 +1630,58 @@ class FSDirectory implements Closeable {
commonAncestor);
}
+ /**
+ * Verify that filesystem limit constraints are not violated
+ * @throws PathComponentTooLongException child's name is too long
+ * @throws MaxDirectoryItemsExceededException items per directory is exceeded
+ */
+ protected <T extends INode> void verifyFsLimits(INode[] pathComponents,
+ int pos, T child) throws FSLimitException {
+ boolean includeChildName = false;
+ try {
+ if (maxComponentLength != 0) {
+ int length = child.getLocalName().length();
+ if (length > maxComponentLength) {
+ includeChildName = true;
+ throw new PathComponentTooLongException(maxComponentLength, length);
+ }
+ }
+ if (maxDirItems != 0) {
+ INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
+ int count = parent.getChildren().size();
+ if (count >= maxDirItems) {
+ throw new MaxDirectoryItemsExceededException(maxDirItems, count);
+ }
+ }
+ } catch (FSLimitException e) {
+ String badPath = getFullPathName(pathComponents, pos-1);
+ if (includeChildName) {
+ badPath += Path.SEPARATOR + child.getLocalName();
+ }
+ e.setPathName(badPath);
+ // Do not throw if edits log is still being processed
+ if (ready) throw(e);
+ // log pre-existing paths that exceed limits
+ NameNode.LOG.error("FSDirectory.verifyFsLimits - " + e.getLocalizedMessage());
+ }
+ }
+
/** Add a node child to the inodes at index pos.
* Its ancestors are stored at [0, pos-1].
* QuotaExceededException is thrown if it violates quota limit */
private <T extends INode> T addChild(INode[] pathComponents, int pos,
T child, long childDiskspace, boolean inheritPermission,
boolean checkQuota) throws QuotaExceededException {
+ // The filesystem limits are not really quotas, so this check may appear
+ // odd. It's because a rename operation deletes the src, tries to add
+ // to the dest, if that fails, re-adds the src from whence it came.
+ // The rename code disables the quota when it's restoring to the
+ // original location becase a quota violation would cause the the item
+ // to go "poof". The fs limits must be bypassed for the same reason.
+ if (checkQuota) {
+ verifyFsLimits(pathComponents, pos, child);
+ }
+
INode.DirCounts counts = new INode.DirCounts();
child.spaceConsumedInTree(counts);
if (childDiskspace < 0) {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Apr 29 03:03:25 2011
@@ -527,6 +527,8 @@ public class FSEditLogLoader {
}
validateChecksum(in, checksum, numEdits);
}
+ } catch (IOException ex) {
+ check203UpgradeFailure(logVersion, ex);
} finally {
if(closeOnExit)
in.close();
@@ -670,4 +672,25 @@ public class FSEditLogLoader {
}
return blocks;
}
+
+ /**
+ * Throw appropriate exception during upgrade from 203, when editlog loading
+ * could fail due to opcode conflicts.
+ */
+ private void check203UpgradeFailure(int logVersion, IOException ex)
+ throws IOException {
+ // 0.20.203 version version has conflicting opcodes with the later releases.
+ // The editlog must be emptied by restarting the namenode, before proceeding
+ // with the upgrade.
+ if (Storage.is203LayoutVersion(logVersion)
+ && logVersion != FSConstants.LAYOUT_VERSION) {
+ String msg = "During upgrade failed to load the editlog version "
+ + logVersion + " from release 0.20.203. Please go back to the old "
+ + " release and restart the namenode. This empties the editlog "
+ + " and saves the namespace. Resume the upgrade after this step.";
+ throw new IOException(msg, ex);
+ } else {
+ throw ex;
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Apr 29 03:03:25 2011
@@ -71,8 +71,6 @@ public class FSImage implements NNStorag
private static final SimpleDateFormat DATE_FORM =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- private static final int FIRST_TXNID_BASED_LAYOUT_VERSION=-32;
-
// checkpoint states
enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
@@ -520,9 +518,9 @@ public class FSImage implements NNStorag
// (ie edits_<txnid>) then use the new inspector, which will ignore
// the old format dirs.
FSImageStorageInspector inspector;
- if (minLayoutVersion <= FIRST_TXNID_BASED_LAYOUT_VERSION) {
+ if (minLayoutVersion <= FSConstants.FIRST_TXNID_BASED_LAYOUT_VERSION) {
inspector = new FSImageTransactionalStorageInspector();
- if (maxLayoutVersion > FIRST_TXNID_BASED_LAYOUT_VERSION) {
+ if (maxLayoutVersion > FSConstants.FIRST_TXNID_BASED_LAYOUT_VERSION) {
LOG.warn("Ignoring one or more storage directories with old layouts");
}
} else {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Apr 29 03:03:25 2011
@@ -238,7 +238,9 @@ public class FSNamesystem implements FSC
public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
public Daemon replthread = null; // Replication thread
-
+ Daemon nnrmthread = null; // NamenodeResourceMonitor thread
+
+ private volatile boolean hasResourcesAvailable = false;
private volatile boolean fsRunning = true;
long systemStart = 0;
@@ -249,6 +251,13 @@ public class FSNamesystem implements FSC
private long heartbeatExpireInterval;
//replicationRecheckInterval is how often namenode checks for new replication work
private long replicationRecheckInterval;
+
+ //resourceRecheckInterval is how often namenode checks for the disk space availability
+ private long resourceRecheckInterval;
+
+ // The actual resource checker instance.
+ NameNodeResourceChecker nnResourceChecker;
+
private FsServerDefaults serverDefaults;
// allow appending to hdfs files
private boolean supportAppends = true;
@@ -299,6 +308,11 @@ public class FSNamesystem implements FSC
*/
private void initialize(Configuration conf, FSImage fsImage)
throws IOException {
+ resourceRecheckInterval =
+ conf.getLong(DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
+ nnResourceChecker = new NameNodeResourceChecker(conf);
+ checkAvailableResources();
this.systemStart = now();
this.blockManager = new BlockManager(this, conf);
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
@@ -346,6 +360,9 @@ public class FSNamesystem implements FSC
lmthread.start();
replthread.start();
+ this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
+ nnrmthread.start();
+
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
@@ -358,7 +375,7 @@ public class FSNamesystem implements FSC
ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
- /* If the dns to swith mapping supports cache, resolve network
+ /* If the dns to switch mapping supports cache, resolve network
* locations of those hosts in the include list,
* and store the mapping in the cache; so future calls to resolve
* will be fast.
@@ -556,6 +573,7 @@ public class FSNamesystem implements FSC
if (dnthread != null) dnthread.interrupt();
if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads();
+ if (nnrmthread != null) nnrmthread.interrupt();
} catch (Exception e) {
LOG.warn("Exception shutting down FSNamesystem", e);
} finally {
@@ -2839,6 +2857,57 @@ public class FSNamesystem implements FSC
}
/**
+ * Returns whether or not there were available resources at the last check of
+ * resources.
+ *
+ * @return true if there were sufficient resources available, false otherwise.
+ */
+ private boolean nameNodeHasResourcesAvailable() {
+ return hasResourcesAvailable;
+ }
+
+ /**
+ * Perform resource checks and cache the results.
+ * @throws IOException
+ */
+ private void checkAvailableResources() throws IOException {
+ hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
+ }
+
+ /**
+ * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
+ * there are found to be insufficient resources available, causes the NN to
+ * enter safe mode. If resources are later found to have returned to
+ * acceptable levels, this daemon will cause the NN to exit safe mode.
+ */
+ class NameNodeResourceMonitor implements Runnable {
+ @Override
+ public void run () {
+ try {
+ while (fsRunning) {
+ checkAvailableResources();
+ if(!nameNodeHasResourcesAvailable()) {
+ String lowResourcesMsg = "NameNode low on available disk space. ";
+ if (!isInSafeMode()) {
+ FSNamesystem.LOG.warn(lowResourcesMsg + "Entering safe mode.");
+ } else {
+ FSNamesystem.LOG.warn(lowResourcesMsg + "Already in safe mode.");
+ }
+ enterSafeMode(true);
+ }
+ try {
+ Thread.sleep(resourceRecheckInterval);
+ } catch (InterruptedException ie) {
+ // Deliberately ignore
+ }
+ }
+ } catch (Exception e) {
+ FSNamesystem.LOG.error("Exception in NameNodeResourceMonitor: ", e);
+ }
+ }
+ }
+
+ /**
* Update access keys.
*/
void updateBlockKey() throws IOException {
@@ -3128,20 +3197,15 @@ public class FSNamesystem implements FSC
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
public void processReport(DatanodeID nodeID,
- BlockListAsLongs newReport
- ) throws IOException {
+ BlockListAsLongs newReport) throws IOException {
+ long startTime, endTime;
writeLock();
+ startTime = now(); //after acquiring write lock
try {
- long startTime = now();
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- + "from " + nodeID.getName()+" " +
- newReport.getNumberOfBlocks()+" blocks");
- }
DatanodeDescriptor node = getDatanode(nodeID);
if (node == null || !node.isAlive) {
- throw new IOException("ProcessReport from dead or unregisterted node: "
+ throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
}
// To minimize startup time, we discard any second (or later) block reports
@@ -3154,10 +3218,16 @@ public class FSNamesystem implements FSC
}
blockManager.processReport(node, newReport);
- NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
} finally {
+ endTime = now();
writeUnlock();
}
+
+ // Log the block report processing stats from Namenode perspective
+ NameNode.getNameNodeMetrics().blockReport.inc((int) (endTime - startTime));
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
+ + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+ + ", processing time: " + (endTime - startTime) + " msecs");
}
/**
@@ -3825,6 +3895,8 @@ public class FSNamesystem implements FSC
private long lastStatusReport = 0;
/** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues = false;
+ /** Was safemode entered automatically because available resources were low. */
+ private boolean resourcesLow = false;
/**
* Creates SafeModeInfo when the name node enters
@@ -3849,14 +3921,15 @@ public class FSNamesystem implements FSC
}
/**
- * Creates SafeModeInfo when safe mode is entered manually.
+ * Creates SafeModeInfo when safe mode is entered manually, or because
+ * available resources are low.
*
* The {@link #threshold} is set to 1.5 so that it could never be reached.
* {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
*
* @see SafeModeInfo
*/
- private SafeModeInfo() {
+ private SafeModeInfo(boolean resourcesLow) {
this.threshold = 1.5f; // this threshold can never be reached
this.datanodeThreshold = Integer.MAX_VALUE;
this.extension = Integer.MAX_VALUE;
@@ -3865,6 +3938,7 @@ public class FSNamesystem implements FSC
this.blockTotal = -1;
this.blockSafe = -1;
this.reached = -1;
+ this.resourcesLow = resourcesLow;
enter();
reportStatus("STATE* Safe mode is ON.", true);
}
@@ -3915,7 +3989,7 @@ public class FSNamesystem implements FSC
}
if(needUpgrade) {
// switch to manual safe mode
- safeMode = new SafeModeInfo();
+ safeMode = new SafeModeInfo(false);
return;
}
}
@@ -3948,8 +4022,13 @@ public class FSNamesystem implements FSC
if (isPopulatingReplQueues()) {
LOG.warn("Replication queues already initialized.");
}
+ long startTimeMisReplicatedScan = now();
blockManager.processMisReplicatedBlocks();
initializedReplQueues = true;
+ NameNode.stateChangeLog.info("STATE* Replication Queue initialization "
+ + "scan for invalid, over- and under-replicated blocks "
+ + "completed in " + (now() - startTimeMisReplicatedScan)
+ + " msec");
}
/**
@@ -3982,7 +4061,8 @@ public class FSNamesystem implements FSC
*/
boolean needEnter() {
return (threshold != 0 && blockSafe < blockThreshold) ||
- (getNumLiveDataNodes() < datanodeThreshold);
+ (getNumLiveDataNodes() < datanodeThreshold) ||
+ (!nameNodeHasResourcesAvailable());
}
/**
@@ -4054,10 +4134,11 @@ public class FSNamesystem implements FSC
}
/**
- * Check if safe mode was entered manually or at startup.
+ * Check if safe mode was entered manually or automatically (at startup, or
+ * when disk space is low).
*/
boolean isManual() {
- return extension == Integer.MAX_VALUE;
+ return extension == Integer.MAX_VALUE && !resourcesLow;
}
/**
@@ -4068,12 +4149,31 @@ public class FSNamesystem implements FSC
}
/**
+ * Check if safe mode was entered due to resources being low.
+ */
+ boolean areResourcesLow() {
+ return resourcesLow;
+ }
+
+ /**
+ * Set that resources are low for this instance of safe mode.
+ */
+ void setResourcesLow() {
+ resourcesLow = true;
+ }
+
+ /**
* A tip on how safe mode is to be turned off: manually or automatically.
*/
String getTurnOffTip() {
if(reached < 0)
return "Safe mode is OFF.";
- String leaveMsg = "Safe mode will be turned off automatically";
+ String leaveMsg = "";
+ if (areResourcesLow()) {
+ leaveMsg = "Resources are low on NN. Safe mode must be turned off manually";
+ } else {
+ leaveMsg = "Safe mode will be turned off automatically";
+ }
if(isManual()) {
if(getDistributedUpgradeState())
return leaveMsg + " upon completion of " +
@@ -4081,6 +4181,7 @@ public class FSNamesystem implements FSC
getDistributedUpgradeStatus() + "%";
leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
}
+
if(blockTotal < 0)
return leaveMsg + ".";
@@ -4203,7 +4304,7 @@ public class FSNamesystem implements FSC
leaveSafeMode(false);
break;
case SAFEMODE_ENTER: // enter safe mode
- enterSafeMode();
+ enterSafeMode(false);
break;
}
}
@@ -4308,7 +4409,7 @@ public class FSNamesystem implements FSC
* Enter safe mode manually.
* @throws IOException
*/
- void enterSafeMode() throws IOException {
+ void enterSafeMode(boolean resourcesLow) throws IOException {
writeLock();
try {
// Ensure that any concurrent operations have been fully synced
@@ -4316,9 +4417,12 @@ public class FSNamesystem implements FSC
// is entirely stable on disk as soon as we're in safe mode.
getEditLog().logSyncAll();
if (!isInSafeMode()) {
- safeMode = new SafeModeInfo();
+ safeMode = new SafeModeInfo(resourcesLow);
return;
}
+ if (resourcesLow) {
+ safeMode.setResourcesLow();
+ }
safeMode.setManual();
NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
+ safeMode.getTurnOffTip());