You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/08/23 01:14:24 UTC

svn commit: r1160493 - in /hadoop/common/trunk/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/

Author: szetszwo
Date: Mon Aug 22 23:14:24 2011
New Revision: 1160493

URL: http://svn.apache.org/viewvc?rev=1160493&view=rev
Log:
HDFS-2266.  Add Namesystem and SafeMode interfaces to avoid directly referring to FSNamesystem in BlockManager.

Added:
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
Modified:
    hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

Modified: hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/CHANGES.txt?rev=1160493&r1=1160492&r2=1160493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs/CHANGES.txt Mon Aug 22 23:14:24 2011
@@ -676,6 +676,9 @@ Trunk (unreleased changes)
     HDFS-2273.  Refactor BlockManager.recentInvalidateSets to a new class.
     (szetszwo)
 
+    HDFS-2266.  Add Namesystem and SafeMode interfaces to avoid directly
+    referring to FSNamesystem in BlockManager. (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1160493&r1=1160492&r2=1160493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Aug 22 23:14:24 2011
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -77,7 +78,7 @@ public class BlockManager {
   /** Default load factor of map */
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
 
-  private final FSNamesystem namesystem;
+  private final Namesystem namesystem;
 
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
@@ -178,7 +179,7 @@ public class BlockManager {
 
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
     blockplacement = BlockPlacementPolicy.getInstance(
-        conf, namesystem, datanodeManager.getNetworkTopology());
+        conf, fsn, datanodeManager.getNetworkTopology());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@@ -374,23 +375,21 @@ public class BlockManager {
   /**
    * Commit a block of a file
    * 
-   * @param fileINode file inode
    * @param block block to be committed
    * @param commitBlock - contains client reported block length and generation
+   * @return true if the block is changed to committed state.
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  private void commitBlock(INodeFileUnderConstruction fileINode,
-                       BlockInfoUnderConstruction block,
-                       Block commitBlock) throws IOException {
+  private boolean commitBlock(final BlockInfoUnderConstruction block,
+      final Block commitBlock) throws IOException {
     if (block.getBlockUCState() == BlockUCState.COMMITTED)
-      return;
+      return false;
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
     block.commitBlock(commitBlock);
-
-    namesystem.updateDiskSpaceConsumed(fileINode, commitBlock);
+    return true;
   }
   
   /**
@@ -399,24 +398,24 @@ public class BlockManager {
    * 
    * @param fileINode file inode
    * @param commitBlock - contains client reported block length and generation
+   * @return true if the last block is changed to committed state.
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  public void commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode, 
+  public boolean commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode, 
       Block commitBlock) throws IOException {
-    
     if(commitBlock == null)
-      return; // not committing, this is a block allocation retry
+      return false; // not committing, this is a block allocation retry
     BlockInfo lastBlock = fileINode.getLastBlock();
     if(lastBlock == null)
-      return; // no blocks in file yet
+      return false; // no blocks in file yet
     if(lastBlock.isComplete())
-      return; // already completed (e.g. by syncBlock)
+      return false; // already completed (e.g. by syncBlock)
     
-    commitBlock(fileINode, (BlockInfoUnderConstruction)lastBlock, commitBlock);
-
+    final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock, commitBlock);
     if(countNodes(lastBlock).liveReplicas() >= minReplication)
       completeBlock(fileINode,fileINode.numBlocks()-1);
+    return b;
   }
 
   /**
@@ -426,8 +425,8 @@ public class BlockManager {
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
-  throws IOException {
+  private BlockInfo completeBlock(final INodeFile fileINode,
+      final int blkIndex) throws IOException {
     if(blkIndex < 0)
       return null;
     BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
@@ -444,8 +443,8 @@ public class BlockManager {
     return blocksMap.replaceBlock(completeBlock);
   }
 
-  BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
-  throws IOException {
+  private BlockInfo completeBlock(final INodeFile fileINode,
+      final BlockInfo block) throws IOException {
     BlockInfo[] fileBlocks = fileINode.getBlocks();
     for(int idx = 0; idx < fileBlocks.length; idx++)
       if(fileBlocks[idx] == block) {
@@ -491,8 +490,9 @@ public class BlockManager {
       invalidateBlocks.remove(datanodeId, oldBlock);
     }
 
-    long fileLength = fileINode.computeContentSummary().getLength();
-    return createLocatedBlock(ucBlock, fileLength - ucBlock.getNumBytes());
+    final long fileLength = fileINode.computeContentSummary().getLength();
+    final long pos = fileLength - ucBlock.getNumBytes();
+    return createLocatedBlock(ucBlock, pos, AccessMode.WRITE);
   }
 
   /**
@@ -513,8 +513,8 @@ public class BlockManager {
   }
 
   private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
-      final long offset, final long length, final int nrBlocksToReturn
-      ) throws IOException {
+      final long offset, final long length, final int nrBlocksToReturn,
+      final AccessMode mode) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@@ -533,7 +533,7 @@ public class BlockManager {
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      results.add(createLocatedBlock(blocks[curBlk], curPos));
+      results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -542,6 +542,15 @@ public class BlockManager {
     return results;
   }
 
+  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
+    final BlockTokenSecretManager.AccessMode mode) throws IOException {
+    final LocatedBlock lb = createLocatedBlock(blk, pos);
+    if (mode != null) {
+      setBlockToken(lb, mode);
+    }
+    return lb;
+  }
+
   /** @return a LocatedBlock for the given block */
   private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
       ) throws IOException {
@@ -600,21 +609,15 @@ public class BlockManager {
       if (LOG.isDebugEnabled()) {
         LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
       }
+      final AccessMode mode = needBlockToken? AccessMode.READ: null;
       final List<LocatedBlock> locatedblocks = createLocatedBlockList(
-          blocks, offset, length, Integer.MAX_VALUE);
+          blocks, offset, length, Integer.MAX_VALUE, mode);
 
       final BlockInfo last = blocks[blocks.length - 1];
       final long lastPos = last.isComplete()?
           fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
           : fileSizeExcludeBlocksUnderConstruction;
-      final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
-
-      if (isBlockTokenEnabled() && needBlockToken) {
-        for(LocatedBlock lb : locatedblocks) {
-          setBlockToken(lb, AccessMode.READ);
-        }
-        setBlockToken(lastlb, AccessMode.READ);
-      }
+      final LocatedBlock lastlb = createLocatedBlock(last, lastPos, mode);
       return new LocatedBlocks(
           fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
           locatedblocks, lastlb, last.isComplete());

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1160493&r1=1160492&r2=1160493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Aug 22 23:14:24 2011
@@ -113,7 +113,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
@@ -149,7 +148,7 @@ import org.mortbay.util.ajax.JSON;
  ***************************************************/
 @InterfaceAudience.Private
 @Metrics(context="dfs")
-public class FSNamesystem implements RwLock, FSClusterStats,
+public class FSNamesystem implements Namesystem, FSClusterStats,
     FSNamesystemMBean, NameNodeMXBean {
   static final Log LOG = LogFactory.getLog(FSNamesystem.class);
 
@@ -517,7 +516,7 @@ public class FSNamesystem implements RwL
     }
   }
 
-  /** Is this name system running? */
+  @Override
   public boolean isRunning() {
     return fsRunning;
   }
@@ -1200,13 +1199,7 @@ public class FSNamesystem implements RwL
         leaseManager.addLease(cons.getClientName(), src);
 
         // convert last block to under-construction
-        LocatedBlock lb = 
-          blockManager.convertLastBlockToUnderConstruction(cons);
-
-        if (lb != null) {
-          blockManager.setBlockToken(lb, AccessMode.WRITE);
-        }
-        return lb;
+        return blockManager.convertLastBlockToUnderConstruction(cons);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1443,8 +1436,7 @@ public class FSNamesystem implements RwL
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
 
       // commit the last block and complete it if it has minimum replicas
-      blockManager.commitOrCompleteLastBlock(pendingFile, ExtendedBlock
-          .getLocalBlock(previous));
+      commitOrCompleteLastBlock(pendingFile, ExtendedBlock.getLocalBlock(previous));
 
       //
       // If we fail this, bad things happen!
@@ -1643,7 +1635,7 @@ public class FSNamesystem implements RwL
 
     INodeFileUnderConstruction pendingFile = checkLease(src, holder);
     // commit the last block and complete it if it has minimum replicas
-    blockManager.commitOrCompleteLastBlock(pendingFile, last);
+    commitOrCompleteLastBlock(pendingFile, last);
 
     if (!checkFileProgress(pendingFile, true)) {
       return false;
@@ -2256,10 +2248,12 @@ public class FSNamesystem implements RwL
     return leaseManager.reassignLease(lease, src, newHolder);
   }
 
-  /** Update disk space consumed. */
-  public void updateDiskSpaceConsumed(final INodeFileUnderConstruction fileINode,
+  private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINode,
       final Block commitBlock) throws IOException {
     assert hasWriteLock();
+    if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
+      return;
+    }
 
     // Adjust disk space consumption if required
     final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
@@ -2366,7 +2360,7 @@ public class FSNamesystem implements RwL
       src = leaseManager.findPath(pendingFile);
       if (closeFile) {
         // commit the last block and complete it if it has minimum replicas
-        blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
+        commitOrCompleteLastBlock(pendingFile, storedBlock);
 
         //remove lease, close file
         finalizeINodeFileUnderConstruction(src, pendingFile);
@@ -2806,7 +2800,7 @@ public class FSNamesystem implements RwL
      *  
      * @param conf configuration
      */
-    SafeModeInfo(Configuration conf) {
+    private SafeModeInfo(Configuration conf) {
       this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
           DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
       this.datanodeThreshold = conf.getInt(
@@ -2850,7 +2844,7 @@ public class FSNamesystem implements RwL
      * Check if safe mode is on.
      * @return true if in safe mode
      */
-    synchronized boolean isOn() {
+    private synchronized boolean isOn() {
       try {
         assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
           + "Total num of blocks, active blocks, or "
@@ -2864,14 +2858,14 @@ public class FSNamesystem implements RwL
     /**
      * Check if we are populating replication queues.
      */
-    synchronized boolean isPopulatingReplQueues() {
+    private synchronized boolean isPopulatingReplQueues() {
       return initializedReplQueues;
     }
 
     /**
      * Enter safe mode.
      */
-    void enter() {
+    private void enter() {
       this.reached = 0;
     }
       
@@ -2881,7 +2875,7 @@ public class FSNamesystem implements RwL
      * Switch to manual safe mode if distributed upgrade is required.<br>
      * Check for invalid, under- & over-replicated blocks in the end of startup.
      */
-    synchronized void leave(boolean checkForUpgrades) {
+    private synchronized void leave(boolean checkForUpgrades) {
       if(checkForUpgrades) {
         // verify whether a distributed upgrade needs to be started
         boolean needUpgrade = false;
@@ -2921,7 +2915,7 @@ public class FSNamesystem implements RwL
     /**
      * Initialize replication queues.
      */
-    synchronized void initializeReplQueues() {
+    private synchronized void initializeReplQueues() {
       LOG.info("initializing replication queues");
       if (isPopulatingReplQueues()) {
         LOG.warn("Replication queues already initialized.");
@@ -2939,7 +2933,7 @@ public class FSNamesystem implements RwL
      * Check whether we have reached the threshold for 
      * initializing replication queues.
      */
-    synchronized boolean canInitializeReplQueues() {
+    private synchronized boolean canInitializeReplQueues() {
       return blockSafe >= blockReplQueueThreshold;
     }
       
@@ -2949,7 +2943,7 @@ public class FSNamesystem implements RwL
      * the extension time have passed.
      * @return true if can leave or false otherwise.
      */
-    synchronized boolean canLeave() {
+    private synchronized boolean canLeave() {
       if (reached == 0)
         return false;
       if (now() - reached < extension) {
@@ -2963,7 +2957,7 @@ public class FSNamesystem implements RwL
      * There is no need to enter safe mode 
      * if DFS is empty or {@link #threshold} == 0
      */
-    boolean needEnter() {
+    private boolean needEnter() {
       return (threshold != 0 && blockSafe < blockThreshold) ||
         (getNumLiveDataNodes() < datanodeThreshold) ||
         (!nameNodeHasResourcesAvailable());
@@ -3007,7 +3001,7 @@ public class FSNamesystem implements RwL
     /**
      * Set total number of blocks.
      */
-    synchronized void setBlockTotal(int total) {
+    private synchronized void setBlockTotal(int total) {
       this.blockTotal = total;
       this.blockThreshold = (int) (blockTotal * threshold);
       this.blockReplQueueThreshold = 
@@ -3020,7 +3014,7 @@ public class FSNamesystem implements RwL
      * reached minimal replication.
      * @param replication current replication 
      */
-    synchronized void incrementSafeBlockCount(short replication) {
+    private synchronized void incrementSafeBlockCount(short replication) {
       if ((int)replication == safeReplication)
         this.blockSafe++;
       checkMode();
@@ -3031,7 +3025,7 @@ public class FSNamesystem implements RwL
      * fallen below minimal replication.
      * @param replication current replication 
      */
-    synchronized void decrementSafeBlockCount(short replication) {
+    private synchronized void decrementSafeBlockCount(short replication) {
       if (replication == safeReplication-1)
         this.blockSafe--;
       checkMode();
@@ -3041,28 +3035,28 @@ public class FSNamesystem implements RwL
      * Check if safe mode was entered manually or automatically (at startup, or
      * when disk space is low).
      */
-    boolean isManual() {
+    private boolean isManual() {
       return extension == Integer.MAX_VALUE && !resourcesLow;
     }
 
     /**
      * Set manual safe mode.
      */
-    synchronized void setManual() {
+    private synchronized void setManual() {
       extension = Integer.MAX_VALUE;
     }
 
     /**
      * Check if safe mode was entered due to resources being low.
      */
-    boolean areResourcesLow() {
+    private boolean areResourcesLow() {
       return resourcesLow;
     }
 
     /**
      * Set that resources are low for this instance of safe mode.
      */
-    void setResourcesLow() {
+    private void setResourcesLow() {
       resourcesLow = true;
     }
 
@@ -3139,9 +3133,7 @@ public class FSNamesystem implements RwL
       lastStatusReport = curTime;
     }
 
-    /**
-     * Returns printable state of the class.
-     */
+    @Override
     public String toString() {
       String resText = "Current safe blocks = " 
         + blockSafe 
@@ -3156,7 +3148,7 @@ public class FSNamesystem implements RwL
      * Checks consistency of the class state.
      * This is costly and currently called only in assert.
      */
-    boolean isConsistent() throws IOException {
+    private boolean isConsistent() throws IOException {
       if (blockTotal == -1 && blockSafe == -1) {
         return true; // manual safe mode
       }
@@ -3215,7 +3207,7 @@ public class FSNamesystem implements RwL
     return isInSafeMode();
   }
 
-  /** Check and trigger safe mode. */
+  @Override
   public void checkSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
@@ -3224,10 +3216,7 @@ public class FSNamesystem implements RwL
     }
   }
 
-  /**
-   * Check whether the name node is in safe mode.
-   * @return true if safe mode is ON, false otherwise
-   */
+  @Override
   public boolean isInSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
@@ -3235,10 +3224,8 @@ public class FSNamesystem implements RwL
       return false;
     return safeMode.isOn();
   }
-  
-  /**
-   * Check whether the name node is in startup mode.
-   */
+
+  @Override
   public boolean isInStartupSafeMode() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
@@ -3247,9 +3234,7 @@ public class FSNamesystem implements RwL
     return !safeMode.isManual() && safeMode.isOn();
   }
 
-  /**
-   * Check whether replication queues are populated.
-   */
+  @Override
   public boolean isPopulatingReplQueues() {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
@@ -3258,10 +3243,7 @@ public class FSNamesystem implements RwL
     return safeMode.isPopulatingReplQueues();
   }
     
-  /**
-   * Increment number of blocks that reached minimal replication.
-   * @param replication current replication 
-   */
+  @Override
   public void incrementSafeBlockCount(int replication) {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
@@ -3270,9 +3252,7 @@ public class FSNamesystem implements RwL
     safeMode.incrementSafeBlockCount((short)replication);
   }
 
-  /**
-   * Decrement number of blocks that reached minimal replication.
-   */
+  @Override
   public void decrementSafeBlockCount(Block b) {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
@@ -3397,10 +3377,6 @@ public class FSNamesystem implements RwL
     }
   }
 
-  public long getTransactionID() {
-    return getEditLog().getSyncTxId();
-  }
-
   CheckpointSignature rollEditLog() throws IOException {
     writeLock();
     try {
@@ -3494,7 +3470,7 @@ public class FSNamesystem implements RwL
     return checkPermission(path, false, null, null, null, null);
   }
 
-  /** Check if the user has superuser privilege. */
+  @Override
   public void checkSuperuserPrivilege() throws AccessControlException {
     if (isPermissionEnabled) {
       FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
@@ -3916,10 +3892,6 @@ public class FSNamesystem implements RwL
     }
   }
 
-  public int numCorruptReplicas(Block blk) {
-    return blockManager.numCorruptReplicas(blk);
-  }
-
   static class CorruptFileBlockInfo {
     String path;
     Block block;

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1160493&r1=1160492&r2=1160493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Aug 22 23:14:24 2011
@@ -1050,7 +1050,7 @@ public class NameNode implements Namenod
 
   @Override // NamenodeProtocol
   public long getTransactionID() {
-    return namesystem.getTransactionID();
+    return namesystem.getEditLog().getSyncTxId();
   }
 
   @Override // NamenodeProtocol

Added: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java?rev=1160493&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java (added)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java Mon Aug 22 23:14:24 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.util.RwLock;
+import org.apache.hadoop.security.AccessControlException;
+
+/** Namesystem operations. */
+@InterfaceAudience.Private
+public interface Namesystem extends RwLock, SafeMode {
+  /** Is this name system running? */
+  public boolean isRunning();
+
+  /** Check if the user has superuser privilege. */
+  public void checkSuperuserPrivilege() throws AccessControlException;
+
+  /** @return the block pool ID */
+  public String getBlockPoolId();
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java?rev=1160493&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java (added)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java Mon Aug 22 23:14:24 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/** SafeMode related operations. */
+@InterfaceAudience.Private
+public interface SafeMode {
+  /**
+   * Check safe mode conditions.
+   * If the corresponding conditions are satisfied,
+   * trigger the system to enter/leave safe mode.
+   */
+  public void checkSafeMode();
+
+  /** Is the system in safe mode? */
+  public boolean isInSafeMode();
+
+  /**
+   * Is the system in startup safe mode, i.e. the system is starting up with
+   * safe mode turned on automatically?
+   */
+  public boolean isInStartupSafeMode();
+
+  /** Check whether replication queues are being populated. */
+  public boolean isPopulatingReplQueues();
+    
+  /**
+   * Increment number of blocks that reached minimal replication.
+   * @param replication current replication 
+   */
+  public void incrementSafeBlockCount(int replication);
+
+  /** Decrement number of blocks that reached minimal replication. */
+  public void decrementSafeBlockCount(Block b);
+}

Modified: hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1160493&r1=1160492&r2=1160493&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Mon Aug 22 23:14:24 2011
@@ -303,7 +303,7 @@ public class DFSTestUtil {
     }
   }
 
-  /*
+  /**
    * Keep accessing the given file until the namenode reports that the
    * given block in the file contains the given number of corrupt replicas.
    */
@@ -312,7 +312,7 @@ public class DFSTestUtil {
       throws IOException, TimeoutException {
     int count = 0;
     final int ATTEMPTS = 50;
-    int repls = ns.numCorruptReplicas(b.getLocalBlock());
+    int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
     while (repls != corruptRepls && count < ATTEMPTS) {
       try {
         IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@@ -321,7 +321,7 @@ public class DFSTestUtil {
         // Swallow exceptions
       }
       System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
-      repls = ns.numCorruptReplicas(b.getLocalBlock());
+      repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
       count++;
     }
     if (count == ATTEMPTS) {