You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 17:46:47 UTC

[45/50] [abbrv] hadoop git commit: HDFS-8984. Move replication queues related methods in FSNamesystem to BlockManager. Contributed by Haohui Mai.

HDFS-8984. Move replication queues related methods in FSNamesystem to BlockManager. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/715b9c64
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/715b9c64
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/715b9c64

Branch: refs/heads/YARN-3926
Commit: 715b9c649982bff91d1f9eae656ba3b82178e1a3
Parents: 8928729
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Sep 4 11:39:58 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Sep 4 11:45:31 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../server/blockmanagement/BlockManager.java    | 54 +++++++++++++++----
 .../server/blockmanagement/DatanodeManager.java |  4 +-
 .../blockmanagement/DecommissionManager.java    |  2 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 57 ++++++--------------
 .../hadoop/hdfs/server/namenode/NameNode.java   |  1 +
 .../hadoop/hdfs/server/namenode/Namesystem.java |  2 +
 .../hadoop/hdfs/server/namenode/SafeMode.java   |  3 --
 .../blockmanagement/TestReplicationPolicy.java  |  3 --
 .../hdfs/server/namenode/NameNodeAdapter.java   |  2 +-
 .../hdfs/server/namenode/TestFSNamesystem.java  | 11 ++--
 .../namenode/TestListCorruptFileBlocks.java     |  3 +-
 12 files changed, 80 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b1e53da..5226d33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -894,6 +894,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8384. Allow NN to startup if there are files having a lease but are not
     under construction. (jing9)
 
+    HDFS-8984. Move replication queues related methods in FSNamesystem to
+    BlockManager. (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 08fbd4f..b0a11fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -127,6 +128,10 @@ public class BlockManager implements BlockStatsMXBean {
   private volatile long corruptReplicaBlocksCount = 0L;
   private volatile long underReplicatedBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
+
+  /** flag indicating whether replication queues have been initialized */
+  private boolean initializedReplQueues;
+
   private final AtomicLong excessBlocksCount = new AtomicLong(0L);
   private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
   private final long startupDelayBlockDeletionInMs;
@@ -1092,7 +1097,7 @@ public class BlockManager implements BlockStatsMXBean {
    * datanode and log the operation
    */
   void addToInvalidates(final Block block, final DatanodeInfo datanode) {
-    if (!namesystem.isPopulatingReplQueues()) {
+    if (!isPopulatingReplQueues()) {
       return;
     }
     invalidateBlocks.add(block, datanode, true);
@@ -1103,7 +1108,7 @@ public class BlockManager implements BlockStatsMXBean {
    * datanodes.
    */
   private void addToInvalidates(Block b) {
-    if (!namesystem.isPopulatingReplQueues()) {
+    if (!isPopulatingReplQueues()) {
       return;
     }
     StringBuilder datanodes = new StringBuilder();
@@ -1124,7 +1129,7 @@ public class BlockManager implements BlockStatsMXBean {
    * is wiped.
    */
   void removeFromInvalidates(final DatanodeInfo datanode) {
-    if (!namesystem.isPopulatingReplQueues()) {
+    if (!isPopulatingReplQueues()) {
       return;
     }
     invalidateBlocks.remove(datanode);
@@ -1211,7 +1216,7 @@ public class BlockManager implements BlockStatsMXBean {
         || corruptedDuringWrite) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
-    } else if (namesystem.isPopulatingReplQueues()) {
+    } else if (isPopulatingReplQueues()) {
       // add the block to neededReplication
       updateNeededReplications(b.getStored(), -1, 0);
     }
@@ -2484,7 +2489,7 @@ public class BlockManager implements BlockStatsMXBean {
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
-        || namesystem.isPopulatingReplQueues()) {
+        || isPopulatingReplQueues()) {
       addStoredBlock(storedBlock, storageInfo, null, false);
       return;
     }
@@ -2586,7 +2591,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // do not try to handle over/under-replicated blocks during first safe mode
-    if (!namesystem.isPopulatingReplQueues()) {
+    if (!isPopulatingReplQueues()) {
       return storedBlock;
     }
 
@@ -3323,7 +3328,7 @@ public class BlockManager implements BlockStatsMXBean {
    */
   void processOverReplicatedBlocksOnReCommission(
       final DatanodeDescriptor srcNode) {
-    if (!namesystem.isPopulatingReplQueues()) {
+    if (!isPopulatingReplQueues()) {
       return;
     }
     final Iterator<BlockInfo> it = srcNode.getBlockIterator();
@@ -3417,7 +3422,7 @@ public class BlockManager implements BlockStatsMXBean {
       final int curReplicasDelta, int expectedReplicasDelta) {
     namesystem.writeLock();
     try {
-      if (!namesystem.isPopulatingReplQueues()) {
+      if (!isPopulatingReplQueues()) {
         return;
       }
       NumberReplicas repl = countNodes(block);
@@ -3662,7 +3667,7 @@ public class BlockManager implements BlockStatsMXBean {
       while (namesystem.isRunning()) {
         try {
           // Process replication work only when active NN is out of safe mode.
-          if (namesystem.isPopulatingReplQueues()) {
+          if (isPopulatingReplQueues()) {
             computeDatanodeWork();
             processPendingReplications();
             rescanPostponedMisreplicatedBlocks();
@@ -3790,4 +3795,35 @@ public class BlockManager implements BlockStatsMXBean {
   public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
     return  datanodeManager.getDatanodeStatistics().getStorageTypeStats();
   }
+
+  /**
+   * Initialize replication queues.
+   */
+  public void initializeReplQueues() {
+    LOG.info("initializing replication queues");
+    processMisReplicatedBlocks();
+    initializedReplQueues = true;
+  }
+
+  /**
+   * Check if replication queues are to be populated
+   * @return true when node is HAState.Active and not in the very first safemode
+   */
+  public boolean isPopulatingReplQueues() {
+    if (!shouldPopulateReplQueues()) {
+      return false;
+    }
+    return initializedReplQueues;
+  }
+
+  public void setInitializedReplQueues(boolean v) {
+    this.initializedReplQueues = v;
+  }
+
+  public boolean shouldPopulateReplQueues() {
+    HAContext haContext = namesystem.getHAContext();
+    if (haContext == null || haContext.getState() == null)
+      return false;
+    return haContext.getState().shouldPopulateReplQueues();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 95ec648..3114937 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1200,7 +1200,7 @@ public class DatanodeManager {
     if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
       String message = "DN " + node + " joining cluster has expanded a formerly " +
           "single-rack cluster to be multi-rack. ";
-      if (namesystem.isPopulatingReplQueues()) {
+      if (blockManager.isPopulatingReplQueues()) {
         message += "Re-checking all blocks for replication, since they should " +
             "now be replicated cross-rack";
         LOG.info(message);
@@ -1210,7 +1210,7 @@ public class DatanodeManager {
         LOG.debug(message);
       }
       hasClusterEverBeenMultiRack = true;
-      if (namesystem.isPopulatingReplQueues()) {
+      if (blockManager.isPopulatingReplQueues()) {
         blockManager.processMisReplicatedBlocks();
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 1a20ab0..6d199d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -546,7 +546,7 @@ public class DecommissionManager {
         if (blockManager.isNeededReplication(block, liveReplicas)) {
           if (!blockManager.neededReplications.contains(block) &&
               blockManager.pendingReplications.getNumReplicas(block) == 0 &&
-              namesystem.isPopulatingReplQueues()) {
+              blockManager.isPopulatingReplQueues()) {
             // Process these blocks only when active NN is out of safe mode.
             blockManager.neededReplications.add(block,
                 curReplicas,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 25b6928..1b770b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -506,9 +506,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final boolean haEnabled;
 
-  /** flag indicating whether replication queues have been initialized */
-  boolean initializedReplQueues = false;
-
   /**
    * Whether the namenode is in the middle of starting the active service
    */
@@ -1038,7 +1035,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       nnResourceChecker = new NameNodeResourceChecker(conf);
       checkAvailableResources();
-      assert safeMode != null && !isPopulatingReplQueues();
+      assert safeMode != null && !blockManager.isPopulatingReplQueues();
       StartupProgress prog = NameNode.getStartupProgress();
       prog.beginPhase(Phase.SAFEMODE);
       long completeBlocksTotal = getCompleteBlocksTotal();
@@ -1105,7 +1102,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // Only need to re-process the queue, If not in SafeMode.
         if (!isInSafeMode()) {
           LOG.info("Reprocessing replication and invalidation queues");
-          initializeReplQueues();
+          blockManager.initializeReplQueues();
         }
 
         if (LOG.isDebugEnabled()) {
@@ -1164,15 +1161,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return haContext != null &&
         haContext.getState().getServiceState() == HAServiceState.ACTIVE;
   }
-  
-  /**
-   * Initialize replication queues.
-   */
-  private void initializeReplQueues() {
-    LOG.info("initializing replication queues");
-    blockManager.processMisReplicatedBlocks();
-    initializedReplQueues = true;
-  }
 
   /**
    * @return Whether the namenode is transitioning to active state and is in the
@@ -1225,8 +1213,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
         // Don't want to keep replication queues when not in Active.
         blockManager.clearQueues();
+        blockManager.setInitializedReplQueues(false);
       }
-      initializedReplQueues = false;
     } finally {
       writeUnlock();
     }
@@ -4237,8 +4225,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     private synchronized void leave() {
       // if not done yet, initialize replication queues.
       // In the standby, do not populate repl queues
-      if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
-        initializeReplQueues();
+      if (!blockManager.isPopulatingReplQueues() && blockManager.shouldPopulateReplQueues()) {
+        blockManager.initializeReplQueues();
       }
       long timeInSafemode = now() - startTime;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
@@ -4274,7 +4262,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
      * initializing replication queues.
      */
     private synchronized boolean canInitializeReplQueues() {
-      return shouldPopulateReplQueues()
+      return blockManager.shouldPopulateReplQueues()
           && blockSafe >= blockReplQueueThreshold;
     }
       
@@ -4327,9 +4315,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (smmthread == null && needEnter()) {
         enter();
         // check if we are ready to initialize replication queues
-        if (canInitializeReplQueues() && !isPopulatingReplQueues()
+        if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues()
             && !haEnabled) {
-          initializeReplQueues();
+          blockManager.initializeReplQueues();
         }
         reportStatus("STATE* Safe mode ON.", false);
         return;
@@ -4354,8 +4342,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
 
       // check if we are ready to initialize replication queues
-      if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) {
-        initializeReplQueues();
+      if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues() && !haEnabled) {
+        blockManager.initializeReplQueues();
       }
     }
       
@@ -4658,24 +4646,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       && safeMode.isOn();
   }
 
-  /**
-   * Check if replication queues are to be populated
-   * @return true when node is HAState.Active and not in the very first safemode
-   */
-  @Override
-  public boolean isPopulatingReplQueues() {
-    if (!shouldPopulateReplQueues()) {
-      return false;
-    }
-    return initializedReplQueues;
-  }
-
-  private boolean shouldPopulateReplQueues() {
-    if(haContext == null || haContext.getState() == null)
-      return false;
-    return haContext.getState().shouldPopulateReplQueues();
-  }
-
   @Override
   public void incrementSafeBlockCount(int replication) {
     // safeMode is volatile, and may be set to null at any time
@@ -5493,7 +5463,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      if (!isPopulatingReplQueues()) {
+      if (!blockManager.isPopulatingReplQueues()) {
         throw new IOException("Cannot run listCorruptFileBlocks because " +
                               "replication queues have not been initialized.");
       }
@@ -6169,6 +6139,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return cacheManager;
   }
 
+  @Override
+  public HAContext getHAContext() {
+    return haContext;
+  }
+
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
     List<String> list = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index fac2d37..6e32066 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index 4a208d8..5bc4033 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
@@ -51,4 +52,5 @@ public interface Namesystem extends RwLock, SafeMode {
   boolean isInSnapshot(BlockInfo blockUC);
 
   CacheManager getCacheManager();
+  HAContext getHAContext();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
index 1428482..06a8219 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
@@ -39,9 +39,6 @@ public interface SafeMode {
    */
   public boolean isInStartupSafeMode();
 
-  /** Check whether replication queues are being populated. */
-  public boolean isPopulatingReplQueues();
-    
   /**
    * Increment number of blocks that reached minimal replication.
    * @param replication current replication 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 27d647c..278b105 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -1221,7 +1221,6 @@ public class TestReplicationPolicy {
   public void testAddStoredBlockDoesNotCauseSkippedReplication()
       throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.isPopulatingReplQueues()).thenReturn(true);
     when(mockNS.hasWriteLock()).thenReturn(true);
     when(mockNS.hasReadLock()).thenReturn(true);
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
@@ -1271,7 +1270,6 @@ public class TestReplicationPolicy {
       testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
           throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.isPopulatingReplQueues()).thenReturn(true);
     when(mockNS.hasReadLock()).thenReturn(true);
 
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
@@ -1334,7 +1332,6 @@ public class TestReplicationPolicy {
   public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
       throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.isPopulatingReplQueues()).thenReturn(true);
     when(mockNS.hasReadLock()).thenReturn(true);
 
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index b314584..64f614d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -243,7 +243,7 @@ public class NameNodeAdapter {
    * @return Replication queue initialization status
    */
   public static boolean safeModeInitializedReplQueues(NameNode nn) {
-    return nn.getNamesystem().isPopulatingReplQueues();
+    return nn.getNamesystem().getBlockManager().isPopulatingReplQueues();
   }
   
   public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
index 7f72797..c2d8c57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@@ -123,13 +124,15 @@ public class TestFSNamesystem {
 
     FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
     FSNamesystem fsn = Mockito.spy(fsNamesystem);
+    BlockManager bm = fsn.getBlockManager();
+    Whitebox.setInternalState(bm, "namesystem", fsn);
 
     //Make shouldPopulaeReplQueues return true
     HAContext haContext = Mockito.mock(HAContext.class);
     HAState haState = Mockito.mock(HAState.class);
     Mockito.when(haContext.getState()).thenReturn(haState);
     Mockito.when(haState.shouldPopulateReplQueues()).thenReturn(true);
-    Whitebox.setInternalState(fsn, "haContext", haContext);
+    Mockito.when(fsn.getHAContext()).thenReturn(haContext);
 
     //Make NameNode.getNameNodeMetrics() not return null
     NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@@ -137,15 +140,15 @@ public class TestFSNamesystem {
     fsn.enterSafeMode(false);
     assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
     assertTrue("Replication queues were being populated during very first "
-        + "safemode", !fsn.isPopulatingReplQueues());
+        + "safemode", !bm.isPopulatingReplQueues());
     fsn.leaveSafeMode();
     assertTrue("FSNamesystem didn't leave safemode", !fsn.isInSafeMode());
     assertTrue("Replication queues weren't being populated even after leaving "
-      + "safemode", fsn.isPopulatingReplQueues());
+      + "safemode", bm.isPopulatingReplQueues());
     fsn.enterSafeMode(false);
     assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
     assertTrue("Replication queues weren't being populated after entering "
-      + "safemode 2nd time", fsn.isPopulatingReplQueues());
+      + "safemode 2nd time", bm.isPopulatingReplQueues());
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/715b9c64/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
index 3afdd0e..99dce1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
@@ -210,7 +210,8 @@ public class TestListCorruptFileBlocks {
       fs = cluster.getFileSystem();
 
       // wait until replication queues have been initialized
-      while (!cluster.getNameNode().namesystem.isPopulatingReplQueues()) {
+      while (!cluster.getNameNode().namesystem.getBlockManager()
+          .isPopulatingReplQueues()) {
         try {
           LOG.info("waiting for replication queues");
           Thread.sleep(1000);