You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/12 20:43:06 UTC
svn commit: r528079 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/dfs/
Author: cutting
Date: Thu Apr 12 11:43:04 2007
New Revision: 528079
URL: http://svn.apache.org/viewvc?view=rev&rev=528079
Log:
HADOOP-1093. Fix a race condition in HDFS where blocks were sometimes erased before they were reported written. Contributed by Dhruba.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Apr 12 11:43:04 2007
@@ -180,6 +180,10 @@
track maps_running and reduces_running.
(Michael Bieniosek via cutting)
+55. HADOOP-1093. Fix a race condition in HDFS where blocks were
+ sometimes erased before they were reported written.
+ (Dhruba Borthakur via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Apr 12 11:43:04 2007
@@ -348,6 +348,12 @@
</property>
<property>
+ <name>dfs.namenode.handler.count</name>
+ <value>10</value>
+ <description>The number of server threads for the namenode.</description>
+</property>
+
+<property>
<name>dfs.safemode.threshold.pct</name>
<value>0.999f</value>
<description>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Thu Apr 12 11:43:04 2007
@@ -30,9 +30,9 @@
interface ClientProtocol extends VersionedProtocol {
/*
- * 11: metasave() added
+ * 11: metasave() added and reportWrittenBlock() removed.
*/
- public static final long versionID = 10L;
+ public static final long versionID = 11L;
///////////////////////////////////////
// File contents
@@ -86,14 +86,6 @@
public boolean setReplication( String src,
short replication
) throws IOException;
-
- /**
- * A client that has written a block of data can report completion
- * back to the NameNode with reportWrittenBlock(). Clients cannot
- * obtain an additional block until the previous one has either been
- * reported as written or abandoned.
- */
- public void reportWrittenBlock(LocatedBlock b) throws IOException;
/**
* If the client has not yet called reportWrittenBlock(), it can
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Apr 12 11:43:04 2007
@@ -1120,7 +1120,7 @@
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
out.write(OP_WRITE_BLOCK);
- out.writeBoolean(false);
+ out.writeBoolean(true);
block.write(out);
out.writeInt(nodes.length);
for (int i = 0; i < nodes.length; i++) {
@@ -1164,6 +1164,7 @@
private LocatedBlock locateFollowingBlock(long start
) throws IOException {
int retries = 5;
+ long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
@@ -1183,7 +1184,9 @@
" seconds");
}
try {
- Thread.sleep(400);
+ LOG.debug("NotReplicatedYetException sleeping " + src +
+ " retries left " + retries);
+ Thread.sleep(sleeptime);
} catch (InterruptedException ie) {
}
}
@@ -1290,6 +1293,7 @@
* We're done writing to the current block.
*/
private synchronized void endBlock() throws IOException {
+ long sleeptime = 400;
//
// Done with local copy
//
@@ -1321,6 +1325,10 @@
if (remainingAttempts == 0) {
throw ie;
}
+ try {
+ Thread.sleep(sleeptime);
+ } catch (InterruptedException e) {
+ }
} finally {
in.close();
}
@@ -1360,7 +1368,6 @@
LocatedBlock lb = new LocatedBlock();
lb.readFields(blockReplyStream);
- namenode.reportWrittenBlock(lb);
s.close();
s = null;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Apr 12 11:43:04 2007
@@ -448,20 +448,6 @@
if( ! processCommand( cmd ) )
continue;
}
-
- // send block report
- if (now - lastBlockReport > blockReportInterval) {
- //
- // Send latest blockinfo report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- DatanodeCommand cmd = namenode.blockReport( dnRegistration,
- data.getBlockReport());
- processCommand( cmd );
- lastBlockReport = now;
- continue;
- }
// check if there are newly received blocks
Block [] blockArray=null;
@@ -481,6 +467,19 @@
}
}
}
+
+ // send block report
+ if (now - lastBlockReport > blockReportInterval) {
+ //
+ // Send latest blockinfo report if timer has expired.
+ // Get back a list of local block(s) that are obsolete
+ // and can be safely GC'ed.
+ //
+ DatanodeCommand cmd = namenode.blockReport( dnRegistration,
+ data.getBlockReport());
+ processCommand( cmd );
+ lastBlockReport = now;
+ }
//
// There is no work to do; sleep until hearbeat timer elapses,
@@ -855,7 +854,9 @@
//
// Process incoming data, copy to disk and
- // maybe to network.
+ // maybe to network. First copy to the network before
+ // writing to local disk so that all datanodes might
+ // write to local disk in parallel.
//
boolean anotherChunk = len != 0;
byte buf[] = new byte[BUFFER_SIZE];
@@ -867,17 +868,6 @@
throw new EOFException("EOF reading from "+s.toString());
}
if (bytesRead > 0) {
- try {
- out.write(buf, 0, bytesRead);
- myMetrics.wroteBytes(bytesRead);
- } catch (IOException iex) {
- if (iex.getMessage().startsWith("No space left on device")) {
- throw new DiskOutOfSpaceException("No space left on device");
- } else {
- shutdown();
- throw iex;
- }
- }
if (out2 != null) {
try {
out2.write(buf, 0, bytesRead);
@@ -897,6 +887,17 @@
out2 = null;
in2 = null;
}
+ }
+ }
+ try {
+ out.write(buf, 0, bytesRead);
+ myMetrics.wroteBytes(bytesRead);
+ } catch (IOException iex) {
+ if (iex.getMessage().startsWith("No space left on device")) {
+ throw new DiskOutOfSpaceException("No space left on device");
+ } else {
+ shutdown();
+ throw iex;
}
}
len -= bytesRead;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Apr 12 11:43:04 2007
@@ -883,17 +883,14 @@
" owned by " + pendingFile.getClientName() +
" and appended by " + clientName);
}
- if (dir.getFile(src) != null) {
- throw new IOException("File " + src + " created during write");
- }
//
// If we fail this, bad things happen!
//
- if (!checkFileProgress(src)) {
- throw new NotReplicatedYetException("Not replicated yet");
+ if (!checkFileProgress(pendingFile, false)) {
+ throw new NotReplicatedYetException("Not replicated yet:" + src);
}
-
+
// Get the array of replication targets
DatanodeDescriptor clientNode = pendingFile.getClientNode();
DatanodeDescriptor targets[] = replicator.chooseTarget(
@@ -977,17 +974,18 @@
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
if( isInSafeMode() )
throw new SafeModeException( "Cannot complete file " + src, safeMode );
- if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
+ FileUnderConstruction pendingFile = pendingCreates.get(src);
+
+ if (dir.getFile(src) != null || pendingFile == null) {
NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
+ "failed to complete " + src
+ " because dir.getFile()==" + dir.getFile(src)
- + " and " + pendingCreates.get(src));
+ + " and " + pendingFile);
return OPERATION_FAILED;
- } else if (! checkFileProgress(src)) {
+ } else if (! checkFileProgress(pendingFile, true)) {
return STILL_WAITING;
}
- FileUnderConstruction pendingFile = pendingCreates.get(src);
Collection<Block> blocks = pendingFile.getBlocks();
int nrBlocks = blocks.size();
Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
@@ -1075,15 +1073,29 @@
/**
* Check that the indicated file's blocks are present and
- * replicated. If not, return false.
+ * replicated. If not, return false. If checkall is true, then check
+ * all blocks, otherwise check only penultimate block.
*/
- synchronized boolean checkFileProgress(UTF8 src) {
- FileUnderConstruction v = pendingCreates.get(src);
-
- for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
+ synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
+ if (checkall) {
+ //
+ // check all blocks of the file.
+ //
+ for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
return false;
}
+ }
+ } else {
+ //
+ // check the penultimate block of this file
+ //
+ Block b = v.getPenultimateBlock();
+ if (b != null) {
+ if (blocksMap.numNodes(b) < this.minReplication) {
+ return false;
+ }
+ }
}
return true;
}
@@ -3441,6 +3453,16 @@
public DatanodeDescriptor getClientNode() {
return clientNode;
+ }
+
+ /**
+ * Return the penultimate allocated block for this file
+ */
+ public Block getPenultimateBlock() {
+ if (blocks.size() <= 1) {
+ return null;
+ }
+ return ((ArrayList<Block>)blocks).get(blocks.size() - 2);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Apr 12 11:43:04 2007
@@ -324,23 +324,6 @@
}
/**
- * The client can report in a set written blocks that it wrote.
- * These blocks are reported via the client instead of the datanode
- * to prevent weird heartbeat race conditions.
- */
- public void reportWrittenBlock(LocatedBlock lb) throws IOException {
- Block b = lb.getBlock();
- DatanodeInfo targets[] = lb.getLocations();
- stateChangeLog.debug("*BLOCK* NameNode.reportWrittenBlock"
- +": " + b.getBlockName() +" is written to "
- +targets.length + " locations" );
-
- for (int i = 0; i < targets.length; i++) {
- namesystem.blockReceived( targets[i], b );
- }
- }
-
- /**
* The client needs to give up on the block.
*/
public void abandonBlock(Block b, String src) throws IOException {