You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/09/17 21:51:28 UTC

svn commit: r998286 [1/2] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/ ...

Author: suresh
Date: Fri Sep 17 19:51:27 2010
New Revision: 998286

URL: http://svn.apache.org/viewvc?rev=998286&view=rev
Log:
merging changes from the trunk

Modified:
    hadoop/hdfs/branches/HDFS-1052/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/build.xml   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
    hadoop/hdfs/branches/HDFS-1052/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1052/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-1052/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-265:796829-820463
 /hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:987665-997035
+/hadoop/hdfs/trunk:987665-998256

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Fri Sep 17 19:51:27 2010
@@ -131,6 +131,10 @@ Trunk (unreleased changes)
     HDFS-1395. Add @Override to FSDataset methods that implement
     FSDatasetInterface methods. (suresh)
 
+    HDFS-1383. Improve the error messages when using hftp://.  (szetszwo)
+
+    HDFS-1093. Change the FSNamesystem lock to a read/write lock. (dhruba)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -1186,6 +1190,8 @@ Release 0.21.0 - Unreleased
 
     HDFS-1267. fuse-dfs does not compile. (Devaraj Das via tomwhite)
 
+    HDFS-1363. Eliminate second synchronized sections in appendFile(). (shv)
+
 Release 0.20.3 - Unreleased
 
   IMPROVEMENTS

Propchange: hadoop/hdfs/branches/HDFS-1052/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/build.xml:779102
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
 /hadoop/hdfs/branches/branch-0.21/build.xml:820487
-/hadoop/hdfs/trunk/build.xml:987665-997035
+/hadoop/hdfs/trunk/build.xml:987665-998256

Propchange: hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
-/hadoop/hdfs/trunk/src/c++/libhdfs:987665-997035
+/hadoop/hdfs/trunk/src/c++/libhdfs:987665-998256

Propchange: hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-997035
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:987665-998256

Propchange: hadoop/hdfs/branches/HDFS-1052/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:987665-997035
+/hadoop/hdfs/trunk/src/java:987665-998256

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Fri Sep 17 19:51:27 2010
@@ -89,29 +89,26 @@ class ByteRangeInputStream extends FSInp
         in = null;
       }
       
-      final URLOpener o;
-      
       // use the original url  if no resolved url exists (e.g., if it's 
       // the first time a request is made)
-      System.out.println("url: "+resolvedURL.getURL());
-      if (resolvedURL.getURL() == null) {
-        o = originalURL;
-      } else {
-        o = resolvedURL;
-      }
-        
+      final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL;
+
       final HttpURLConnection connection = o.openConnection();
-      connection.setRequestMethod("GET");
-      if (startPos != 0) {
-        connection.setRequestProperty("Range", "bytes="+startPos+"-");
-      }
-      connection.connect();
-      final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-      filelength = cl == null? -1: Long.parseLong(cl);
-      if (HftpFileSystem.LOG.isDebugEnabled()) {
-        HftpFileSystem.LOG.debug("filelength = " + filelength);
+      try {
+        connection.setRequestMethod("GET");
+        if (startPos != 0) {
+          connection.setRequestProperty("Range", "bytes="+startPos+"-");
+        }
+        connection.connect();
+        final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
+        filelength = cl == null? -1: Long.parseLong(cl);
+        if (HftpFileSystem.LOG.isDebugEnabled()) {
+          HftpFileSystem.LOG.debug("filelength = " + filelength);
+        }
+        in = connection.getInputStream();
+      } catch(IOException ioe) {
+        HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
       }
-      in = connection.getInputStream();
       
       if (startPos != 0 && connection.getResponseCode() != 206) {
         // we asked for a byte range but did not receive a partial content

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Sep 17 19:51:27 2010
@@ -269,6 +269,15 @@ public class HftpFileSystem extends File
     return ugiParamenter.toString();
   }
   
+  static Void throwIOExceptionFromConnection(
+      final HttpURLConnection connection, final IOException ioe
+      ) throws IOException {
+    final int code = connection.getResponseCode();
+    final String s = connection.getResponseMessage();
+    throw s == null? ioe:
+        new IOException(s + " (error code=" + code + ")", ioe);
+  }
+
   /**
    * Open an HTTP connection to the namenode to read file data and metadata.
    * @param path The path component of the URL
@@ -278,9 +287,13 @@ public class HftpFileSystem extends File
       throws IOException {
     query = updateQuery(query);
     final URL url = getNamenodeURL(path, query);
-    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
-    connection.setRequestMethod("GET");
-    connection.connect();
+    final HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    try {
+      connection.setRequestMethod("GET");
+      connection.connect();
+    } catch(IOException ioe) {
+      throwIOExceptionFromConnection(connection, ioe);
+    }
     return connection;
   }
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java Fri Sep 17 19:51:27 2010
@@ -153,8 +153,11 @@ public class BackupStorage extends FSIma
         throw new IOException("Could not locate checkpoint directories");
       StorageDirectory sdName = itImage.next();
       StorageDirectory sdEdits = itEdits.next();
-      synchronized(getFSDirectoryRootLock()) { // load image under rootDir lock
+      getFSDirectoryRootLock().writeLock();
+      try { // load image under rootDir lock
         loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+      } finally {
+        getFSDirectoryRootLock().writeUnlock();
       }
       loadFSEdits(sdEdits);
     }
@@ -172,8 +175,8 @@ public class BackupStorage extends FSIma
     saveNamespace(false);
   }
 
-  private Object getFSDirectoryRootLock() {
-    return getFSNamesystem().dir.rootDir;
+  private FSDirectory getFSDirectoryRootLock() {
+    return getFSNamesystem().dir;
   }
 
   static File getJSpoolDir(StorageDirectory sd) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Sep 17 19:51:27 2010
@@ -349,19 +349,44 @@ public class BlockManager {
   }
 
   /**
-   * Convert the last block of the file to an under construction block.
+   * Convert the last block of the file to an under construction block.<p>
+   * The block is converted only if the file has blocks and the last one
+   * is a partial block (its size is less than the preferred block size).
+   * The converted block is returned to the client.
+   * The client uses the returned block locations to form the data pipeline
+   * for this block.<br>
+   * The methods returns null if there is no partial block at the end.
+   * The client is supposed to allocate a new block with the next call.
+   *
    * @param fileINode file
-   * @param targets data-nodes that will form the pipeline for this block
+   * @return the last block locations if the block is partial or null otherwise
    */
-  void convertLastBlockToUnderConstruction(
-      INodeFileUnderConstruction fileINode,
-      DatanodeDescriptor[] targets) throws IOException {
+  LocatedBlock convertLastBlockToUnderConstruction(
+      INodeFileUnderConstruction fileINode) throws IOException {
     BlockInfo oldBlock = fileINode.getLastBlock();
-    if(oldBlock == null)
-      return;
+    if(oldBlock == null ||
+        fileINode.getPreferredBlockSize() == oldBlock.getNumBytes())
+      return null;
+    assert oldBlock == getStoredBlock(oldBlock) :
+      "last block of the file is not in blocksMap";
+
+    DatanodeDescriptor[] targets = getNodes(oldBlock);
+
     BlockInfoUnderConstruction ucBlock =
       fileINode.setLastBlock(oldBlock, targets);
     blocksMap.replaceBlock(ucBlock);
+
+    // Remove block from replication queue.
+    updateNeededReplications(oldBlock, 0, 0);
+
+    // remove this block from the list of pending blocks to be deleted. 
+    for (DatanodeDescriptor dd : targets) {
+      String datanodeId = dd.getStorageID();
+      removeFromInvalidates(datanodeId, oldBlock);
+    }
+
+    long fileLength = fileINode.computeContentSummary().getLength();
+    return getBlockLocation(ucBlock, fileLength - ucBlock.getNumBytes());
   }
 
   /**
@@ -383,7 +408,7 @@ public class BlockManager {
   }
 
   List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
-      long length, int nrBlocksToReturn, boolean needBlockToken) throws IOException {
+      long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@@ -402,7 +427,7 @@ public class BlockManager {
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      results.add(getBlockLocation(blocks[curBlk], curPos, needBlockToken));
+      results.add(getBlockLocation(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -413,12 +438,12 @@ public class BlockManager {
 
   /** @param needBlockToken 
    * @return a LocatedBlock for the given block */
-  LocatedBlock getBlockLocation(final BlockInfo blk, final long pos, boolean needBlockToken
+  LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
       ) throws IOException {
     if (!blk.isComplete()) {
       final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
       final DatanodeDescriptor[] locations = uc.getExpectedLocations();
-      return namesystem.createLocatedBlock(uc, locations, pos, false, needBlockToken);
+      return namesystem.createLocatedBlock(uc, locations, pos, false);
     }
 
     // get block locations
@@ -444,7 +469,7 @@ public class BlockManager {
           machines[j++] = d;
       }
     }
-    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt, needBlockToken);    
+    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
   }
 
   /**
@@ -720,7 +745,8 @@ public class BlockManager {
     for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
       blocksToReplicate.add(new ArrayList<Block>());
     }
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       synchronized (neededReplications) {
         if (neededReplications.size() == 0) {
           missingBlocksInCurIter = 0;
@@ -763,7 +789,9 @@ public class BlockManager {
           }
         } // end for
       } // end synchronized neededReplication
-    } // end synchronized namesystem
+    } finally {
+      namesystem.writeUnlock();
+    }
 
     return blocksToReplicate;
   }
@@ -781,7 +809,8 @@ public class BlockManager {
     INodeFile fileINode = null;
     int additionalReplRequired;
 
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       synchronized (neededReplications) {
         // block should belong to a file
         fileINode = blocksMap.getINode(block);
@@ -828,6 +857,8 @@ public class BlockManager {
         }
 
       }
+    } finally {
+      namesystem.writeUnlock();
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
@@ -839,7 +870,8 @@ public class BlockManager {
     if(targets.length == 0)
       return false;
 
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
@@ -916,6 +948,8 @@ public class BlockManager {
           }
         }
       }
+    } finally {
+      namesystem.writeUnlock();
     }
 
     return true;
@@ -997,7 +1031,8 @@ public class BlockManager {
   void processPendingReplications() {
     Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
-      synchronized (namesystem) {
+      namesystem.writeLock();
+      try {
         for (int i = 0; i < timedOutItems.length; i++) {
           NumberReplicas num = countNodes(timedOutItems[i]);
           if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
@@ -1008,6 +1043,8 @@ public class BlockManager {
                                    getReplication(timedOutItems[i]));
           }
         }
+      } finally {
+        namesystem.writeUnlock();
       }
       /* If we know the target datanodes where the replication timedout,
        * we could invoke decBlocksScheduled() on it. Its ok for now.
@@ -1057,6 +1094,7 @@ public class BlockManager {
                                DatanodeDescriptor node,
                                DatanodeDescriptor delNodeHint)
   throws IOException {
+    assert (namesystem.hasWriteLock());
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -1187,7 +1225,8 @@ public class BlockManager {
    */
   void processMisReplicatedBlocks() {
     long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       neededReplications.clear();
       for (BlockInfo block : blocksMap.getBlocks()) {
         INodeFile fileINode = block.getINode();
@@ -1215,6 +1254,8 @@ public class BlockManager {
           processOverReplicatedBlock(block, expectedReplication, null, null);
         }
       }
+    } finally {
+      namesystem.writeUnlock();
     }
     FSNamesystem.LOG.info("Total number of blocks = " + blocksMap.size());
     FSNamesystem.LOG.info("Number of invalid blocks = " + nrInvalid);
@@ -1278,7 +1319,8 @@ public class BlockManager {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
           + block + " from " + node.getName());
     }
-    synchronized (namesystem) {
+    assert (namesystem.hasWriteLock());
+    {
       if (!blocksMap.removeNode(block, node)) {
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
@@ -1510,7 +1552,8 @@ public class BlockManager {
   /* updates a block in under replication queue */
   void updateNeededReplications(Block block, int curReplicasDelta,
       int expectedReplicasDelta) {
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
       if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
@@ -1523,6 +1566,8 @@ public class BlockManager {
         neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
                                   oldExpectedReplicas);
       }
+    } finally {
+      namesystem.writeUnlock();
     }
   }
 
@@ -1565,7 +1610,8 @@ public class BlockManager {
    * @return number of blocks scheduled for removal during this iteration.
    */
   private int invalidateWorkForOneNode(String nodeId) {
-    synchronized (namesystem) {
+    namesystem.writeLock();
+    try {
       // blocks should not be replicated or removed if safe mode is on
       if (namesystem.isInSafeMode())
         return 0;
@@ -1610,6 +1656,8 @@ public class BlockManager {
       }
       pendingDeletionBlocksCount -= blocksToInvalidate.size();
       return blocksToInvalidate.size();
+    } finally {
+      namesystem.writeUnlock();
     }
   }
   
@@ -1700,8 +1748,11 @@ public class BlockManager {
   }
 
   int getCapacity() {
-    synchronized(namesystem) {
+    namesystem.readLock();
+    try {
       return blocksMap.getCapacity();
+    } finally {
+      namesystem.readUnlock();
     }
   }
   

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java Fri Sep 17 19:51:27 2010
@@ -57,8 +57,11 @@ class DecommissionManager {
      */
     public void run() {
       for(; fsnamesystem.isRunning(); ) {
-        synchronized(fsnamesystem) {
+        fsnamesystem.writeLock();
+        try {
           check();
+        } finally {
+          fsnamesystem.writeUnlock();
         }
   
         try {
@@ -90,4 +93,4 @@ class DecommissionManager {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Sep 17 19:51:27 2010
@@ -24,6 +24,9 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -66,7 +69,32 @@ class FSDirectory implements Closeable {
   private volatile boolean ready = false;
   private static final long UNKNOWN_DISK_SPACE = -1;
   private final int lsLimit;  // max list limit
-  
+
+  // lock to protect BlockMap.
+  private ReentrantReadWriteLock bLock;
+  private Condition cond;
+
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    this.bLock.readLock().lock();
+  }
+
+  void readUnlock() {
+    this.bLock.readLock().unlock();
+  }
+
+  void writeLock() {
+    this.bLock.writeLock().lock();
+  }
+
+  void writeUnlock() {
+    this.bLock.writeLock().unlock();
+  }
+
+  boolean hasWriteLock() {
+    return this.bLock.isWriteLockedByCurrentThread();
+  }
+
   /**
    * Caches frequently used file names used in {@link INode} to reuse 
    * byte[] objects and reduce heap usage.
@@ -87,6 +115,8 @@ class FSDirectory implements Closeable {
   }
 
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
+    this.bLock = new ReentrantReadWriteLock(true); // fair
+    this.cond = bLock.writeLock().newCondition();
     fsImage.setFSNamesystem(ns);
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
@@ -134,10 +164,13 @@ class FSDirectory implements Closeable {
       fsImage.close();
       throw e;
     }
-    synchronized (this) {
+    writeLock();
+    try {
       this.ready = true;
       this.nameCache.initialized();
-      this.notifyAll();
+      cond.signalAll();
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -158,13 +191,16 @@ class FSDirectory implements Closeable {
    */
   void waitForReady() {
     if (!ready) {
-      synchronized (this) {
+      writeLock();
+      try {
         while (!ready) {
           try {
-            this.wait(5000);
+            cond.await(5000, TimeUnit.MILLISECONDS);
           } catch (InterruptedException ie) {
           }
         }
+      } finally {
+        writeUnlock();
       }
     }
   }
@@ -196,8 +232,11 @@ class FSDirectory implements Closeable {
                                  permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
-    synchronized (rootDir) {
+    writeLock();
+    try {
       newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
+    } finally {
+      writeUnlock();
     }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
@@ -234,7 +273,8 @@ class FSDirectory implements Closeable {
                               modificationTime, atime, preferredBlockSize);
       diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
     }
-    synchronized (rootDir) {
+    writeLock();
+    try {
       try {
         newNode = addNode(path, newNode, diskspace, false);
         if(newNode != null && blocks != null) {
@@ -249,7 +289,10 @@ class FSDirectory implements Closeable {
         return null;
       }
       return newNode;
+    } finally {
+      writeUnlock();
     }
+
   }
 
   INodeDirectory addToParent( byte[][] src,
@@ -286,7 +329,8 @@ class FSDirectory implements Closeable {
     }
     // add new node to the parent
     INodeDirectory newParent = null;
-    synchronized (rootDir) {
+    writeLock();
+    try {
       try {
         newParent = rootDir.addToParent(src, newNode, parentINode,
                                         false, propagateModTime);
@@ -306,6 +350,8 @@ class FSDirectory implements Closeable {
           newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
         }
       }
+    } finally {
+      writeUnlock();
     }
     return newParent;
   }
@@ -320,7 +366,8 @@ class FSDirectory implements Closeable {
   ) throws QuotaExceededException {
     waitForReady();
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       assert inodes[inodes.length-1].isUnderConstruction() :
         "INode should correspond to a file under construction";
       INodeFileUnderConstruction fileINode = 
@@ -347,6 +394,8 @@ class FSDirectory implements Closeable {
             + "file system");
       }
       return blockInfo;
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -356,13 +405,16 @@ class FSDirectory implements Closeable {
   void persistBlocks(String path, INodeFileUnderConstruction file) {
     waitForReady();
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       fsImage.getEditLog().logOpenFile(path, file);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
             +path+" with "+ file.getBlocks().length 
             +" blocks is persisted to the file system");
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -372,7 +424,8 @@ class FSDirectory implements Closeable {
   void closeFile(String path, INodeFile file) {
     waitForReady();
     long now = now();
-    synchronized (rootDir) {
+    writeLock();
+    try {
       // file is closed
       file.setModificationTimeForce(now);
       fsImage.getEditLog().logCloseFile(path, file);
@@ -381,6 +434,8 @@ class FSDirectory implements Closeable {
             +path+" with "+ file.getBlocks().length 
             +" blocks is persisted to the file system");
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -391,7 +446,8 @@ class FSDirectory implements Closeable {
                       Block block) throws IOException {
     waitForReady();
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       // modify file-> block and blocksMap
       fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
@@ -405,6 +461,8 @@ class FSDirectory implements Closeable {
             +path+" with "+block
             +" block is added to the file system");
       }
+    } finally {
+      writeUnlock();
     }
     return true;
   }
@@ -462,7 +520,8 @@ class FSDirectory implements Closeable {
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
-    synchronized (rootDir) {
+    writeLock();
+    try {
       INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
       INode srcInode = srcInodes[srcInodes.length-1];
       
@@ -560,6 +619,8 @@ class FSDirectory implements Closeable {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst);
       return false;
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -586,7 +647,8 @@ class FSDirectory implements Closeable {
       }
     }
     String error = null;
-    synchronized (rootDir) {
+    writeLock();
+    try {
       final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
       final INode srcInode = srcInodes[srcInodes.length - 1];
       // validate source
@@ -733,6 +795,8 @@ class FSDirectory implements Closeable {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + "failed to rename " + src + " to " + dst);
       throw new IOException("rename from " + src + " to " + dst + " failed.");
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -764,7 +828,9 @@ class FSDirectory implements Closeable {
     }
     oldReplication[0] = -1;
     Block[] fileBlocks = null;
-    synchronized(rootDir) {
+
+    writeLock();
+    try {
       INode[] inodes = rootDir.getExistingPathINodes(src, true);
       INode inode = inodes[inodes.length - 1];
       if (inode == null) {
@@ -784,6 +850,8 @@ class FSDirectory implements Closeable {
 
       fileNode.setReplication(replication);
       fileBlocks = fileNode.getBlocks();
+    } finally {
+      writeUnlock();
     }
     return fileBlocks;
   }
@@ -795,7 +863,8 @@ class FSDirectory implements Closeable {
    */
   long getPreferredBlockSize(String filename) throws UnresolvedLinkException,
       FileNotFoundException, IOException {
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode inode = rootDir.getNode(filename, false);
       if (inode == null) {
         throw new FileNotFoundException("File does not exist: " + filename);
@@ -804,12 +873,15 @@ class FSDirectory implements Closeable {
         throw new IOException("Getting block size of non-file: "+ filename); 
       }
       return ((INodeFile)inode).getPreferredBlockSize();
+    } finally {
+      readUnlock();
     }
   }
 
   boolean exists(String src) throws UnresolvedLinkException {
     src = normalizePath(src);
-    synchronized(rootDir) {
+    readLock();
+    try {
       INode inode = rootDir.getNode(src, false);
       if (inode == null) {
          return false;
@@ -817,6 +889,8 @@ class FSDirectory implements Closeable {
       return inode.isDirectory() || inode.isLink() 
         ? true 
         : ((INodeFile)inode).getBlocks() != null;
+    } finally {
+      readUnlock();
     }
   }
 
@@ -828,12 +902,15 @@ class FSDirectory implements Closeable {
 
   void unprotectedSetPermission(String src, FsPermission permissions) 
     throws FileNotFoundException, UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
         INode inode = rootDir.getNode(src, true);
         if (inode == null) {
             throw new FileNotFoundException("File does not exist: " + src);
         }
         inode.setPermission(permissions);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -845,7 +922,8 @@ class FSDirectory implements Closeable {
 
   void unprotectedSetOwner(String src, String username, String groupname) 
     throws FileNotFoundException, UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
       INode inode = rootDir.getNode(src, true);
       if (inode == null) {
           throw new FileNotFoundException("File does not exist: " + src);
@@ -856,6 +934,8 @@ class FSDirectory implements Closeable {
       if (groupname != null) {
         inode.setGroup(groupname);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -864,13 +944,16 @@ class FSDirectory implements Closeable {
    */
   public void concatInternal(String target, String [] srcs) 
       throws UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
       // actual move
       waitForReady();
 
       unprotectedConcat(target, srcs);
       // do the commit
       fsImage.getEditLog().logConcat(target, srcs, now());
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -952,12 +1035,15 @@ class FSDirectory implements Closeable {
     if (!isDir(src)) {
       return true;
     }
-    synchronized(rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(src, false);
       assert targetNode != null : "should be taken care in isDir() above";
       if (((INodeDirectory)targetNode).getChildren().size() != 0) {
         dirNotEmpty = false;
       }
+    } finally {
+      readUnlock();
     }
     return dirNotEmpty;
   }
@@ -1004,7 +1090,8 @@ class FSDirectory implements Closeable {
       long mtime) throws UnresolvedLinkException {
     src = normalizePath(src);
 
-    synchronized (rootDir) {
+    writeLock();
+    try {
       INode[] inodes =  rootDir.getExistingPathINodes(src, false);
       INode targetNode = inodes[inodes.length-1];
 
@@ -1035,6 +1122,8 @@ class FSDirectory implements Closeable {
             +src+" is removed");
       }
       return filesRemoved;
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1052,7 +1141,8 @@ class FSDirectory implements Closeable {
   private void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
                            boolean updateDiskspace) 
       throws IOException, UnresolvedLinkException {    
-    synchronized (rootDir) {
+    writeLock();
+    try {
       long dsOld = oldnode.diskspaceConsumed();
       
       //
@@ -1089,6 +1179,8 @@ class FSDirectory implements Closeable {
         newnode.setBlock(index, info); // inode refers to the block in BlocksMap
         index++;
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1104,7 +1196,8 @@ class FSDirectory implements Closeable {
       boolean needLocation) throws UnresolvedLinkException, IOException {
     String srcs = normalizePath(src);
 
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(srcs, true);
       if (targetNode == null)
         return null;
@@ -1126,6 +1219,8 @@ class FSDirectory implements Closeable {
       }
       return new DirectoryListing(
           listing, totalNumChildren-startChild-numOfListing);
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1138,7 +1233,8 @@ class FSDirectory implements Closeable {
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
       throws UnresolvedLinkException {
     String srcs = normalizePath(src);
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(srcs, resolveLink);
       if (targetNode == null) {
         return null;
@@ -1146,6 +1242,8 @@ class FSDirectory implements Closeable {
       else {
         return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
       }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1154,7 +1252,8 @@ class FSDirectory implements Closeable {
    */
   Block[] getFileBlocks(String src) throws UnresolvedLinkException {
     waitForReady();
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(src, false);
       if (targetNode == null)
         return null;
@@ -1163,6 +1262,8 @@ class FSDirectory implements Closeable {
       if (targetNode.isLink()) 
         return null;
       return ((INodeFile)targetNode).getBlocks();
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1170,12 +1271,15 @@ class FSDirectory implements Closeable {
    * Get {@link INode} associated with the file.
    */
   INodeFile getFileINode(String src) throws UnresolvedLinkException {
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode inode = rootDir.getNode(src, true);
       if (inode == null || inode.isDirectory())
         return null;
       assert !inode.isLink();      
       return (INodeFile)inode;
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1193,8 +1297,11 @@ class FSDirectory implements Closeable {
    */
   INode[] getExistingPathINodes(String path) 
     throws UnresolvedLinkException {
-    synchronized (rootDir){
+    readLock();
+    try {
       return rootDir.getExistingPathINodes(path, true);
+    } finally {
+      readUnlock();
     }
   }
   
@@ -1203,7 +1310,8 @@ class FSDirectory implements Closeable {
    */
   boolean isValidToCreate(String src) throws UnresolvedLinkException {
     String srcs = normalizePath(src);
-    synchronized (rootDir) {
+    readLock();
+    try {
       if (srcs.startsWith("/") && 
           !srcs.endsWith("/") && 
           rootDir.getNode(srcs, false) == null) {
@@ -1211,6 +1319,8 @@ class FSDirectory implements Closeable {
       } else {
         return false;
       }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1218,9 +1328,12 @@ class FSDirectory implements Closeable {
    * Check whether the path specifies a directory
    */
   boolean isDir(String src) throws UnresolvedLinkException {
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode node = rootDir.getNode(normalizePath(src), false);
       return node != null && node.isDirectory();
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1237,7 +1350,8 @@ class FSDirectory implements Closeable {
                                          throws QuotaExceededException,
                                                 FileNotFoundException,
                                                 UnresolvedLinkException {
-    synchronized (rootDir) {
+    writeLock();
+    try {
       INode[] inodes = rootDir.getExistingPathINodes(path, false);
       int len = inodes.length;
       if (inodes[len - 1] == null) {
@@ -1245,6 +1359,8 @@ class FSDirectory implements Closeable {
                                         " does not exist under rootDir.");
       }
       updateCount(inodes, len-1, nsDelta, dsDelta, true);
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -1360,7 +1476,8 @@ class FSDirectory implements Closeable {
     byte[][] components = INode.getPathComponents(names);
     INode[] inodes = new INode[components.length];
 
-    synchronized(rootDir) {
+    writeLock();
+    try {
       rootDir.getExistingPathINodes(components, inodes, false);
 
       // find the index of the first null in inodes[]
@@ -1393,6 +1510,8 @@ class FSDirectory implements Closeable {
               "DIR* FSDirectory.mkdirs: created directory " + cur);
         }
       }
+    } finally {
+      writeUnlock();
     }
     return true;
   }
@@ -1404,11 +1523,14 @@ class FSDirectory implements Closeable {
                           UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     INode[] inodes = new INode[components.length];
-    synchronized (rootDir) {
+    writeLock();
+    try {
       rootDir.getExistingPathINodes(components, inodes, false);
       unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
           permissions, false, timestamp);
       return inodes[inodes.length-1];
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1435,10 +1557,13 @@ class FSDirectory implements Closeable {
     child.setLocalName(path);
     cacheName(child);
     INode[] inodes = new INode[components.length];
-    synchronized (rootDir) {
+    writeLock();
+    try {
       rootDir.getExistingPathINodes(components, inodes, false);
       return addChild(inodes, inodes.length-1, child, childDiskspace,
                       inheritPermission);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1597,7 +1722,8 @@ class FSDirectory implements Closeable {
   ContentSummary getContentSummary(String src) 
     throws FileNotFoundException, UnresolvedLinkException {
     String srcs = normalizePath(src);
-    synchronized (rootDir) {
+    readLock();
+    try {
       INode targetNode = rootDir.getNode(srcs, false);
       if (targetNode == null) {
         throw new FileNotFoundException("File does not exist: " + srcs);
@@ -1605,6 +1731,8 @@ class FSDirectory implements Closeable {
       else {
         return targetNode.computeContentSummary();
       }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1710,7 +1838,8 @@ class FSDirectory implements Closeable {
     
     String srcs = normalizePath(src);
 
-    synchronized(rootDir) {
+    writeLock();
+    try {
       INode[] inodes = rootDir.getExistingPathINodes(src, true);
       INode targetNode = inodes[inodes.length-1];
       if (targetNode == null) {
@@ -1744,6 +1873,8 @@ class FSDirectory implements Closeable {
         }
         return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
       }
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -1755,18 +1886,24 @@ class FSDirectory implements Closeable {
   void setQuota(String src, long nsQuota, long dsQuota) 
     throws FileNotFoundException, QuotaExceededException,
     UnresolvedLinkException { 
-    synchronized (rootDir) {    
+    writeLock();
+    try {
       INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
       if (dir != null) {
         fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
                                          dir.getDsQuota());
       }
+    } finally {
+      writeUnlock();
     }
   }
   
   long totalInodes() {
-    synchronized (rootDir) {
+    readLock();
+    try {
       return rootDir.numItemsInTree();
+    } finally {
+      readUnlock();
     }
   }
 
@@ -1781,9 +1918,12 @@ class FSDirectory implements Closeable {
 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
     throws UnresolvedLinkException {
-    synchronized(rootDir) {
+    writeLock();
+    try {
       INodeFile inode = getFileINode(src);
       return unprotectedSetTimes(src, inode, mtime, atime, force);
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -1940,8 +2080,11 @@ class FSDirectory implements Closeable {
       throws UnresolvedLinkException {
     INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
     try {
-      synchronized (rootDir) {
+      writeLock();
+      try {
         newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
+      } finally {
+        writeUnlock();
       }
     } catch (UnresolvedLinkException e) {
       /* All UnresolvedLinkExceptions should have been resolved by now, but we