You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/10/28 01:04:08 UTC

svn commit: r708396 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

Author: szetszwo
Date: Mon Oct 27 17:04:08 2008
New Revision: 708396

URL: http://svn.apache.org/viewvc?rev=708396&view=rev
Log:
HADOOP-4517. Release FSDataset lock before joining ongoing create threads. (szetszwo)

Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=708396&r1=708395&r2=708396&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Mon Oct 27 17:04:08 2008
@@ -997,6 +997,9 @@
     HADOOP-4467. SerializationFactory now uses the current context ClassLoader
     allowing for user supplied Serialization instances. (Chris Wensel via
     acmurthy)
+
+    HADOOP-4517. Release FSDataset lock before joining ongoing create threads.
+    (szetszwo)
  
 Release 0.18.1 - 2008-09-17
 

Modified: hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=708396&r1=708395&r2=708396&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Oct 27 17:04:08 2008
@@ -551,8 +551,8 @@
     
 
   static class ActiveFile {
-    File file;
-    List<Thread> threads = new ArrayList<Thread>(2);
+    final File file;
+    final List<Thread> threads = new ArrayList<Thread>(2);
 
     ActiveFile(File f, List<Thread> list) {
       file = f;
@@ -796,38 +796,65 @@
     }
   }
 
-  /** interrupt and wait for all ongoing create threads */
-  private synchronized void interruptOngoingCreates(Block b) {
-    //remove ongoingCreates threads
-    ActiveFile activefile = ongoingCreates.get(b);
-    if (activefile != null) {
-      for(Thread t : activefile.threads) {
+  /** {@inheritDoc} */
+  public void updateBlock(Block oldblock, Block newblock) throws IOException {
+    if (oldblock.getBlockId() != newblock.getBlockId()) {
+      throw new IOException("Cannot update oldblock (=" + oldblock
+          + ") to newblock (=" + newblock + ").");
+    }
+    
+    for(;;) {
+      final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
+      if (threads == null) {
+        return;
+      }
+
+      // interrupt and wait for all ongoing create threads
+      for(Thread t : threads) {
         t.interrupt();
       }
-      for(Thread t : activefile.threads) {
+      for(Thread t : threads) {
         try {
           t.join();
         } catch (InterruptedException e) {
-          DataNode.LOG.warn("interruptOngoingCreates: b=" + b
-              + ", activeFile=" + activefile + ", t=" + t, e);
+          DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
         }
       }
-      activefile.threads.clear();
     }
   }
-  /** {@inheritDoc} */
-  public synchronized void updateBlock(Block oldblock, Block newblock
-      ) throws IOException {
-    if (oldblock.getBlockId() != newblock.getBlockId()) {
-      throw new IOException("Cannot update oldblock (=" + oldblock
-          + ") to newblock (=" + newblock + ").");
+
+  /**
+   * Try to update an old block to a new block.
+   * If there are ongoing create threads running for the old block,
+   * the threads will be returned without updating the block. 
+   * 
+   * @return ongoing create threads if there is any. Otherwise, return null.
+   */
+  private synchronized List<Thread> tryUpdateBlock(
+      Block oldblock, Block newblock) throws IOException {
+    //check ongoing create threads
+    final ActiveFile activefile = ongoingCreates.get(oldblock);
+    if (activefile != null && !activefile.threads.isEmpty()) {
+      //remove dead threads
+      for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
+        final Thread t = i.next();
+        if (!t.isAlive()) {
+          i.remove();
+        }
+      }
+
+      //return living threads
+      if (!activefile.threads.isEmpty()) {
+        return new ArrayList<Thread>(activefile.threads);
+      }
     }
+
+    //No ongoing create threads is alive.  Update block.
     File blockFile = findBlockFile(oldblock.getBlockId());
     if (blockFile == null) {
       throw new IOException("Block " + oldblock + " does not exist.");
     }
-    interruptOngoingCreates(oldblock);
-    
+
     File oldMetaFile = findMetaFile(blockFile);
     long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
     
@@ -866,6 +893,7 @@
     // paranoia! verify that the contents of the stored block 
     // matches the block file on disk.
     validateBlockMetadata(newblock);
+    return null;
   }
 
   static private void truncateBlock(File blockFile, File metaFile,