You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/12 20:43:06 UTC

svn commit: r528079 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/

Author: cutting
Date: Thu Apr 12 11:43:04 2007
New Revision: 528079

URL: http://svn.apache.org/viewvc?view=rev&rev=528079
Log:
HADOOP-1093.  Fix a race condition in HDFS where blocks were sometimes erased before they were reported written.  Contributed by Dhruba.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Apr 12 11:43:04 2007
@@ -180,6 +180,10 @@
     track maps_running and reduces_running.
     (Michael Bieniosek via cutting)
 
+55. HADOOP-1093.  Fix a race condition in HDFS where blocks were
+    sometimes erased before they were reported written.
+    (Dhruba Borthakur via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Apr 12 11:43:04 2007
@@ -348,6 +348,12 @@
 </property>
 
 <property>
+  <name>dfs.namenode.handler.count</name>
+  <value>10</value>
+  <description>The number of server threads for the namenode.</description>
+</property>
+
+<property>
   <name>dfs.safemode.threshold.pct</name>
   <value>0.999f</value>
   <description>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Thu Apr 12 11:43:04 2007
@@ -30,9 +30,9 @@
 interface ClientProtocol extends VersionedProtocol {
 
     /*
-     * 11: metasave() added
+     * 11: metasave() added and reportWrittenBlock() removed.
      */
-    public static final long versionID = 10L;  
+    public static final long versionID = 11L;  
   
     ///////////////////////////////////////
     // File contents
@@ -86,14 +86,6 @@
     public boolean setReplication( String src, 
                                 short replication
                               ) throws IOException;
-
-    /**
-     * A client that has written a block of data can report completion
-     * back to the NameNode with reportWrittenBlock().  Clients cannot
-     * obtain an additional block until the previous one has either been 
-     * reported as written or abandoned.
-     */
-    public void reportWrittenBlock(LocatedBlock b) throws IOException;
 
     /**
      * If the client has not yet called reportWrittenBlock(), it can

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Apr 12 11:43:04 2007
@@ -1120,7 +1120,7 @@
                 //
                 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
                 out.write(OP_WRITE_BLOCK);
-                out.writeBoolean(false);
+                out.writeBoolean(true);
                 block.write(out);
                 out.writeInt(nodes.length);
                 for (int i = 0; i < nodes.length; i++) {
@@ -1164,6 +1164,7 @@
         private LocatedBlock locateFollowingBlock(long start
                                                   ) throws IOException {     
           int retries = 5;
+          long sleeptime = 400;
           while (true) {
             long localstart = System.currentTimeMillis();
             while (true) {
@@ -1183,7 +1184,9 @@
                              " seconds");
                   }
                   try {
-                    Thread.sleep(400);
+                    LOG.debug("NotReplicatedYetException sleeping " + src +
+                              " retries left " + retries);
+                    Thread.sleep(sleeptime);
                   } catch (InterruptedException ie) {
                   }
                 }                
@@ -1290,6 +1293,7 @@
          * We're done writing to the current block.
          */
         private synchronized void endBlock() throws IOException {
+            long sleeptime = 400;
             //
             // Done with local copy
             //
@@ -1321,6 +1325,10 @@
                     if (remainingAttempts == 0) {
                       throw ie;
                     }
+                    try {
+                      Thread.sleep(sleeptime);
+                    } catch (InterruptedException e) {
+                    }
                 } finally {
                   in.close();
                 }
@@ -1360,7 +1368,6 @@
                     
             LocatedBlock lb = new LocatedBlock();
             lb.readFields(blockReplyStream);
-            namenode.reportWrittenBlock(lb);
 
             s.close();
             s = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Apr 12 11:43:04 2007
@@ -448,20 +448,6 @@
             if( ! processCommand( cmd ) )
               continue;
           }
-
-          // send block report
-          if (now - lastBlockReport > blockReportInterval) {
-            //
-            // Send latest blockinfo report if timer has expired.
-            // Get back a list of local block(s) that are obsolete
-            // and can be safely GC'ed.
-            //
-            DatanodeCommand cmd = namenode.blockReport( dnRegistration,
-                                                        data.getBlockReport());
-            processCommand( cmd );
-            lastBlockReport = now;
-            continue;
-          }
             
           // check if there are newly received blocks
           Block [] blockArray=null;
@@ -481,6 +467,19 @@
               }
             }
           }
+
+          // send block report
+          if (now - lastBlockReport > blockReportInterval) {
+            //
+            // Send latest blockinfo report if timer has expired.
+            // Get back a list of local block(s) that are obsolete
+            // and can be safely GC'ed.
+            //
+            DatanodeCommand cmd = namenode.blockReport( dnRegistration,
+                                                        data.getBlockReport());
+            processCommand( cmd );
+            lastBlockReport = now;
+          }
             
           //
           // There is no work to do;  sleep until hearbeat timer elapses, 
@@ -855,7 +854,9 @@
               
               //
               // Process incoming data, copy to disk and
-              // maybe to network.
+              // maybe to network. First copy to the network before
+              // writing to local disk so that all datanodes might
+              // write to local disk in parallel.
               //
               boolean anotherChunk = len != 0;
               byte buf[] = new byte[BUFFER_SIZE];
@@ -867,17 +868,6 @@
                     throw new EOFException("EOF reading from "+s.toString());
                   }
                   if (bytesRead > 0) {
-                    try {
-                      out.write(buf, 0, bytesRead);
-                      myMetrics.wroteBytes(bytesRead);
-                    } catch (IOException iex) {
-                      if (iex.getMessage().startsWith("No space left on device")) {
-                    	  throw new DiskOutOfSpaceException("No space left on device");
-                      } else {
-                        shutdown();
-                        throw iex;
-                      }
-                    }
                     if (out2 != null) {
                       try {
                         out2.write(buf, 0, bytesRead);
@@ -897,6 +887,17 @@
                           out2 = null;
                           in2 = null;
                         }
+                      }
+                    }
+                    try {
+                      out.write(buf, 0, bytesRead);
+                      myMetrics.wroteBytes(bytesRead);
+                    } catch (IOException iex) {
+                      if (iex.getMessage().startsWith("No space left on device")) {
+                    	  throw new DiskOutOfSpaceException("No space left on device");
+                      } else {
+                        shutdown();
+                        throw iex;
                       }
                     }
                     len -= bytesRead;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Apr 12 11:43:04 2007
@@ -883,17 +883,14 @@
               " owned by " + pendingFile.getClientName() + 
               " and appended by " + clientName);
         }
-        if (dir.getFile(src) != null) {
-          throw new IOException("File " + src + " created during write");
-        }
 
         //
         // If we fail this, bad things happen!
         //
-        if (!checkFileProgress(src)) {
-          throw new NotReplicatedYetException("Not replicated yet");
+        if (!checkFileProgress(pendingFile, false)) {
+          throw new NotReplicatedYetException("Not replicated yet:" + src);
         }
-        
+
         // Get the array of replication targets
         DatanodeDescriptor clientNode = pendingFile.getClientNode();
         DatanodeDescriptor targets[] = replicator.chooseTarget(
@@ -977,17 +974,18 @@
         NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
         if( isInSafeMode() )
           throw new SafeModeException( "Cannot complete file " + src, safeMode );
-        if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
+        FileUnderConstruction pendingFile = pendingCreates.get(src);
+
+        if (dir.getFile(src) != null || pendingFile == null) {
             NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
                     + "failed to complete " + src
                     + " because dir.getFile()==" + dir.getFile(src) 
-                    + " and " + pendingCreates.get(src));
+                    + " and " + pendingFile);
             return OPERATION_FAILED;
-        } else if (! checkFileProgress(src)) {
+        } else if (! checkFileProgress(pendingFile, true)) {
             return STILL_WAITING;
         }
         
-        FileUnderConstruction pendingFile = pendingCreates.get(src);
         Collection<Block> blocks = pendingFile.getBlocks();
         int nrBlocks = blocks.size();
         Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
@@ -1075,15 +1073,29 @@
 
     /**
      * Check that the indicated file's blocks are present and
-     * replicated.  If not, return false.
+     * replicated.  If not, return false. If checkall is true, then check
+     * all blocks, otherwise check only penultimate block.
      */
-    synchronized boolean checkFileProgress(UTF8 src) {
-        FileUnderConstruction v = pendingCreates.get(src);
-
-        for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
+    synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
+        if (checkall) {
+          //
+          // check all blocks of the file.
+          //
+          for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
             if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
                 return false;
             }
+          }
+        } else {
+          //
+          // check the penultimate block of this file
+          //
+          Block b = v.getPenultimateBlock();
+          if (b != null) {
+            if (blocksMap.numNodes(b) < this.minReplication) {
+                return false;
+            }
+          }
         }
         return true;
     }
@@ -3441,6 +3453,16 @@
 
       public DatanodeDescriptor getClientNode() {
         return clientNode;
+      }
+
+      /**
+       * Return the penultimate allocated block for this file
+       */
+      public Block getPenultimateBlock() {
+        if (blocks.size() <= 1) {
+          return null;
+        }
+        return ((ArrayList<Block>)blocks).get(blocks.size() - 2);
       }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=528079&r1=528078&r2=528079
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Apr 12 11:43:04 2007
@@ -324,23 +324,6 @@
     }
 
     /**
-     * The client can report in a set written blocks that it wrote.
-     * These blocks are reported via the client instead of the datanode
-     * to prevent weird heartbeat race conditions.
-     */
-    public void reportWrittenBlock(LocatedBlock lb) throws IOException {
-        Block b = lb.getBlock();        
-        DatanodeInfo targets[] = lb.getLocations();
-        stateChangeLog.debug("*BLOCK* NameNode.reportWrittenBlock"
-                +": " + b.getBlockName() +" is written to "
-                +targets.length + " locations" );
-
-        for (int i = 0; i < targets.length; i++) {
-            namesystem.blockReceived( targets[i], b );
-        }
-    }
-
-    /**
      * The client needs to give up on the block.
      */
     public void abandonBlock(Block b, String src) throws IOException {