You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/06/04 19:52:27 UTC

svn commit: r663328 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Wed Jun  4 10:52:27 2008
New Revision: 663328

URL: http://svn.apache.org/viewvc?rev=663328&view=rev
Log:
HADOOP-3113. An fsync invoked on a HDFS file really really persists data! The datanode
moves blocks in the tmp directory to the real block directory on a datanode-restart.
(dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun  4 10:52:27 2008
@@ -74,6 +74,10 @@
     HADOOP-3459. Change in the output format of dfs -ls to more closely match
     /bin/ls. New format is: perm repl owner group size date name (Mukund Madhugiri via omally)
 
+    HADOOP-3113. An fsync invoked on a HDFS file really really persists data! The datanode
+    moves blocks in the tmp directory to the real block directory on a datanode-restart.
+    (dhruba)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Jun  4 10:52:27 2008
@@ -3066,11 +3066,19 @@
   }
 
   /** {@inheritDoc} */
-  public void updateBlock(Block oldblock, Block newblock) throws IOException {
+  public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
     }
     data.updateBlock(oldblock, newblock);
+    if (finalize) {
+      data.finalizeBlock(newblock);
+      myMetrics.blocksWritten.inc(); 
+      notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
+      LOG.info("Received block " + newblock +
+                " of size " + newblock.getNumBytes() +
+                " as part of lease recovery.");
+    }
   }
 
   /** {@inheritDoc} */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Wed Jun  4 10:52:27 2008
@@ -299,11 +299,17 @@
       if (detachDir.exists()) {
         recoverDetachedBlocks(currentDir, detachDir);
       }
-      this.dataDir = new FSDir(currentDir);
+
+      // Files that were being written when the datanode was last shutdown
+      // are now moved back to the data directory. It is possible that
+      // in the future, we might want to do some sort of datanode-local
+      // recovery for these blocks. For example, crc validation.
+      //
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
-        FileUtil.fullyDelete(tmpDir);
+        recoverDetachedBlocks(currentDir, tmpDir);
       }
+      this.dataDir = new FSDir(currentDir);
       if (!tmpDir.mkdirs()) {
         if (!tmpDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
@@ -651,8 +657,6 @@
   private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
   static  Random random = new Random();
   
-  long blockWriteTimeout = 3600 * 1000;
-  
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -665,8 +669,6 @@
     volumes = new FSVolumeSet(volArray);
     volumeMap = new HashMap<Block, DatanodeBlockInfo>();
     volumes.getVolumeMap(volumeMap);
-    blockWriteTimeout = Math.max(
-         conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000L;
     registerMBean(storage.getStorageID());
   }
 
@@ -910,18 +912,8 @@
         threads = activeFile.threads;
         
         if (!isRecovery) {
-          // check how old is the temp file - wait 1 hour
-          if ((System.currentTimeMillis() - f.lastModified()) < 
-              blockWriteTimeout) {
-            throw new IOException("Block " + b +
+          throw new IOException("Block " + b +
                                   " has already been started (though not completed), and thus cannot be created.");
-          } else {
-            // stale temp file - remove
-            if (!f.delete()) {
-              throw new IOException("Can't write the block - unable to remove stale temp file " + f);
-            }
-            f = null;
-          }
         } else {
           for (Thread thread:threads) {
             thread.interrupt();
@@ -1016,7 +1008,11 @@
    * Complete the block write!
    */
   public synchronized void finalizeBlock(Block b) throws IOException {
-    File f = ongoingCreates.get(b).file;
+    ActiveFile activeFile = ongoingCreates.get(b);
+    if (activeFile == null) {
+      throw new IOException("Block " + b + " is already finalized.");
+    }
+    File f = activeFile.file;
     if (f == null || !f.exists()) {
       throw new IOException("No temporary file " + f + " for block " + b);
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Jun  4 10:52:27 2008
@@ -1118,7 +1118,7 @@
 
       // allocate new block record block locations in INode.
       newBlock = allocateBlock(src, pendingFile);
-      pendingFile.targets = targets;
+      pendingFile.setTargets(targets);
     }
         
     // Create next block
@@ -1638,8 +1638,31 @@
 
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
 
-    // Initialize lease recovery for pendingFile
+    // Initialize lease recovery for pendingFile. If there are no blocks 
+    // associated with this file, then reap lease immediately. Otherwise 
+    // renew the lease and trigger lease recovery.
+    if (pendingFile.getTargets().length == 0) {
+      if (pendingFile.getBlocks().length == 0) {
+        finalizeINodeFileUnderConstruction(src, pendingFile);
+        NameNode.stateChangeLog.warn("BLOCK*"
+          + " internalReleaseLease: No blocks found, lease removed.");
+        return;
+      }
+      // setup the Inode.targets for the last block from the blocksMap
+      //
+      Block[] blocks = pendingFile.getBlocks();
+      Block last = blocks[blocks.length-1];
+      DatanodeDescriptor[] targets = 
+         new DatanodeDescriptor[blocksMap.numNodes(last)];
+      Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+      for (int i = 0; it != null && it.hasNext(); i++) {
+        targets[i] = it.next();
+      }
+      pendingFile.setTargets(targets);
+    }
+    // start lease recovery of the last block for this file.
     pendingFile.assignPrimaryDatanode();
+    leaseManager.renewLease(lease);
   }
 
   private void finalizeINodeFileUnderConstruction(String src,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java Wed Jun  4 10:52:27 2008
@@ -776,7 +776,7 @@
   DatanodeDescriptor clientNode = null; // if client is a cluster node too.
 
   private int primaryNodeIndex = -1; //the node working on lease recovery
-  DatanodeDescriptor[] targets = null;   //locations for last block
+  private DatanodeDescriptor[] targets = null;   //locations for last block
   
   INodeFileUnderConstruction() {}
 
@@ -833,6 +833,15 @@
     return true;
   }
 
+  DatanodeDescriptor[] getTargets() {
+    return targets;
+  }
+
+  void setTargets(DatanodeDescriptor[] targets) {
+    this.targets = targets;
+    this.primaryNodeIndex = -1;
+  }
+
   //
   // converts a INodeFileUnderConstruction into a INodeFile
   //
@@ -870,14 +879,14 @@
 
   /**
    * Initialize lease recovery for this object
-   * @return the chosen primary datanode
    */
   void assignPrimaryDatanode() {
     //assign the first alive datanode as the primary datanode
+
     if (targets.length == 0) {
       NameNode.stateChangeLog.warn("BLOCK*"
-          + " INodeFileUnderConstruction.initLeaseRecovery:"
-          + " all targets are not alive.");
+        + " INodeFileUnderConstruction.initLeaseRecovery:"
+        + " No blocks found, lease removed.");
     }
 
     int previous = primaryNodeIndex;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java Wed Jun  4 10:52:27 2008
@@ -30,9 +30,9 @@
   public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
 
   /**
-   * 2: change updateGenerationStamp to updataBlock
+   * 3: added a finalize parameter to updateBlock
    */
-  public static final long versionID = 2L;
+  public static final long versionID = 3L;
 
   /** @return the BlockMetaDataInfo of a block;
    *  null if the block is not found 
@@ -42,5 +42,5 @@
   /**
    * Update the block to the new generation stamp and length.  
    */
-  void updateBlock(Block oldblock, Block newblock) throws IOException;
-}
\ No newline at end of file
+  void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java Wed Jun  4 10:52:27 2008
@@ -357,7 +357,6 @@
                   for(StringBytesWritable s : top.paths) {
                     fsnamesystem.internalReleaseLease(top, s.getString());
                   }
-                  renewLease(top);
                 } else {
                   break;
                 }
@@ -476,7 +475,7 @@
 
     for(BlockRecord r : syncList) {
       try {
-        r.datanode.updateBlock(r.block, newblock);
+        r.datanode.updateBlock(r.block, newblock, closeFile);
         successList.add(r.id);
       } catch (IOException e) {
         InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java Wed Jun  4 10:52:27 2008
@@ -91,11 +91,11 @@
       //verify updateBlock
       Block newblock = new Block(
           b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
-      idp.updateBlock(b, newblock);
+      idp.updateBlock(b, newblock, false);
       checkMetaInfo(newblock, idp, datanode.blockScanner);
     }
     finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java?rev=663328&r1=663327&r2=663328&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery.java Wed Jun  4 10:52:27 2008
@@ -100,7 +100,7 @@
       for(int i = 0; i < REPLICATION_NUM; i++) {
         newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
             lastblock.getGenerationStamp());
-        idps[i].updateBlock(lastblock, newblocks[i]);
+        idps[i].updateBlock(lastblock, newblocks[i], false);
         checkMetaInfo(newblocks[i], idps[i]);
       }