You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/08/23 00:28:18 UTC

svn commit: r1160475 - in /hadoop/common/trunk/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java

Author: szetszwo
Date: Mon Aug 22 22:28:17 2011
New Revision: 1160475

URL: http://svn.apache.org/viewvc?rev=1160475&view=rev
Log:
HDFS-2273.  Refactor BlockManager.recentInvalidateSets to a new class.

Added:
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
Modified:
    hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Modified: hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/CHANGES.txt?rev=1160475&r1=1160474&r2=1160475&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs/CHANGES.txt Mon Aug 22 22:28:17 2011
@@ -673,6 +673,9 @@ Trunk (unreleased changes)
 
     HDFS-2096. Mavenization of hadoop-hdfs (Alejandro Abdelnur via tomwhite)
 
+    HDFS-2273.  Refactor BlockManager.recentInvalidateSets to a new class.
+    (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1160475&r1=1160474&r2=1160475&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Aug 22 22:28:17 2011
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -89,7 +88,6 @@ public class BlockManager {
   private volatile long underReplicatedBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
   private volatile long excessBlocksCount = 0L;
-  private volatile long pendingDeletionBlocksCount = 0L;
   
   /** Used by metrics */
   public long getPendingReplicationBlocksCount() {
@@ -109,7 +107,7 @@ public class BlockManager {
   }
   /** Used by metrics */
   public long getPendingDeletionBlocksCount() {
-    return pendingDeletionBlocksCount;
+    return invalidateBlocks.numBlocks();
   }
   /** Used by metrics */
   public long getExcessBlocksCount() {
@@ -131,14 +129,8 @@ public class BlockManager {
   /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
-  //
-  // Keeps a Collection for every named machine containing
-  // blocks that have recently been invalidated and are thought to live
-  // on the machine in question.
-  // Mapping: StorageID -> ArrayList<Block>
-  //
-  private final Map<String, Collection<Block>> recentInvalidateSets =
-    new TreeMap<String, Collection<Block>>();
+  /** Blocks to be invalidated. */
+  private final InvalidateBlocks invalidateBlocks;
 
   //
   // Keeps a TreeSet for every named node. Each treeset contains
@@ -182,6 +174,7 @@ public class BlockManager {
     namesystem = fsn;
     datanodeManager = new DatanodeManager(this, fsn, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
+    invalidateBlocks = new InvalidateBlocks(datanodeManager);
 
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
     blockplacement = BlockPlacementPolicy.getInstance(
@@ -306,7 +299,9 @@ public class BlockManager {
     this.blockplacement = newpolicy;
   }
 
+  /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
+    assert namesystem.hasWriteLock();
     //
     // Dump contents of neededReplication
     //
@@ -357,7 +352,7 @@ public class BlockManager {
     pendingReplications.metaSave(out);
 
     // Dump blocks that are waiting to be deleted
-    dumpRecentInvalidateSets(out);
+    invalidateBlocks.dump(out);
 
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
@@ -493,7 +488,7 @@ public class BlockManager {
     // remove this block from the list of pending blocks to be deleted. 
     for (DatanodeDescriptor dd : targets) {
       String datanodeId = dd.getStorageID();
-      removeFromInvalidates(datanodeId, oldBlock);
+      invalidateBlocks.remove(datanodeId, oldBlock);
     }
 
     long fileLength = fileINode.computeContentSummary().getLength();
@@ -510,7 +505,7 @@ public class BlockManager {
       blocksMap.nodeIterator(block); it.hasNext();) {
       String storageID = it.next().getStorageID();
       // filter invalidate replicas
-      if( ! belongsToInvalidates(storageID, block)) {
+      if(!invalidateBlocks.contains(storageID, block)) {
         machineSet.add(storageID);
       }
     }
@@ -754,64 +749,15 @@ public class BlockManager {
     }
 
     node.resetBlocks();
-    removeFromInvalidates(node.getStorageID());
-  }
-  
-  private void removeFromInvalidates(String storageID, Block block) {
-    synchronized(recentInvalidateSets) {
-      Collection<Block> v = recentInvalidateSets.get(storageID);
-      if (v != null && v.remove(block)) {
-        pendingDeletionBlocksCount--;
-        if (v.isEmpty()) {
-          recentInvalidateSets.remove(storageID);
-        }
-      }
-    }
-  }
-
-  boolean belongsToInvalidates(String storageID, Block block) {
-    Collection<Block> invalidateSet;
-    synchronized(recentInvalidateSets) {
-      invalidateSet = recentInvalidateSets.get(storageID);
-      return invalidateSet != null && invalidateSet.contains(block);
-    }
-  }
-
-  /**
-   * Adds block to list of blocks which will be invalidated on specified
-   * datanode
-   *
-   * @param b block
-   * @param dn datanode
-   * @param log true to create an entry in the log 
-   */
-  private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
-    synchronized(recentInvalidateSets) {
-      Collection<Block> invalidateSet = recentInvalidateSets
-          .get(dn.getStorageID());
-      if (invalidateSet == null) {
-        invalidateSet = new HashSet<Block>();
-        recentInvalidateSets.put(dn.getStorageID(), invalidateSet);
-      }
-      if (invalidateSet.add(b)) {
-        pendingDeletionBlocksCount++;
-        if (log) {
-          NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
-              + b + " to " + dn.getName());
-        }
-      }
-    }
+    invalidateBlocks.remove(node.getStorageID());
   }
 
   /**
    * Adds block to list of blocks which will be invalidated on specified
    * datanode and log the operation
-   *
-   * @param b block
-   * @param dn datanode
    */
-  void addToInvalidates(Block b, DatanodeInfo dn) {
-    addToInvalidates(b, dn, true);
+  void addToInvalidates(final Block block, final DatanodeInfo datanode) {
+    invalidateBlocks.add(block, datanode, true);
   }
 
   /**
@@ -823,7 +769,7 @@ public class BlockManager {
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
         .hasNext();) {
       DatanodeDescriptor node = it.next();
-      addToInvalidates(b, node, false);
+      invalidateBlocks.add(b, node, false);
       datanodes.append(node.getName()).append(" ");
     }
     if (datanodes.length() != 0) {
@@ -833,30 +779,6 @@ public class BlockManager {
   }
 
   /**
-   * dumps the contents of recentInvalidateSets
-   */
-  private void dumpRecentInvalidateSets(PrintWriter out) {
-    assert namesystem.hasWriteLock();
-    int size;
-    synchronized(recentInvalidateSets) {
-      size = recentInvalidateSets.values().size();
-    }
-    out.println("Metasave: Blocks " + pendingDeletionBlocksCount 
-        + " waiting deletion from " + size + " datanodes.");
-    if (size == 0) {
-      return;
-    }
-    synchronized(recentInvalidateSets) {
-      for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
-        Collection<Block> blocks = entry.getValue();
-        if (blocks.size() > 0) {
-          out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
-        }
-      }
-    }
-  }
-
-  /**
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
@@ -962,35 +884,14 @@ public class BlockManager {
    * @return total number of block for deletion
    */
   int computeInvalidateWork(int nodesToProcess) {
-    int numOfNodes;
-    ArrayList<String> keyArray;
+    final List<String> nodes = invalidateBlocks.getStorageIDs();
+    Collections.shuffle(nodes);
 
-    synchronized(recentInvalidateSets) {
-      numOfNodes = recentInvalidateSets.size();
-      // get an array of the keys
-      keyArray = new ArrayList<String>(recentInvalidateSets.keySet());
-    }
-
-    nodesToProcess = Math.min(numOfNodes, nodesToProcess);
-
-    // randomly pick up <i>nodesToProcess</i> nodes
-    // and put them at [0, nodesToProcess)
-    int remainingNodes = numOfNodes - nodesToProcess;
-    if (nodesToProcess < remainingNodes) {
-      for(int i=0; i<nodesToProcess; i++) {
-        int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i)+i;
-        Collections.swap(keyArray, keyIndex, i); // swap to front
-      }
-    } else {
-      for(int i=0; i<remainingNodes; i++) {
-        int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i);
-        Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
-      }
-    }
+    nodesToProcess = Math.min(nodes.size(), nodesToProcess);
 
     int blockCnt = 0;
     for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
-      blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
+      blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
     }
     return blockCnt;
   }
@@ -1592,7 +1493,7 @@ public class BlockManager {
     }
 
     // Ignore replicas already scheduled to be removed from the DN
-    if(belongsToInvalidates(dn.getStorageID(), block)) {
+    if(invalidateBlocks.contains(dn.getStorageID(), block)) {
       assert storedBlock.findDatanode(dn) < 0 : "Block " + block
         + " in recentInvalidatesSet should not appear in DN " + dn;
       return storedBlock;
@@ -2371,7 +2272,7 @@ public class BlockManager {
   }
 
   public int getActiveBlockCount() {
-    return blocksMap.size() - (int)pendingDeletionBlocksCount;
+    return blocksMap.size() - (int)invalidateBlocks.numBlocks();
   }
 
   public DatanodeDescriptor[] getNodes(BlockInfo block) {
@@ -2441,16 +2342,6 @@ public class BlockManager {
     return fileINode.getReplication();
   }
 
-  /** Remove a datanode from the invalidatesSet */
-  private void removeFromInvalidates(String storageID) {
-    Collection<Block> blocks;
-    synchronized(recentInvalidateSets) {
-      blocks = recentInvalidateSets.remove(storageID);
-    }
-    if (blocks != null) {
-      pendingDeletionBlocksCount -= blocks.size();
-    }
-  }
 
   /**
    * Get blocks to invalidate for <i>nodeId</i>
@@ -2466,49 +2357,7 @@ public class BlockManager {
         return 0;
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
-      final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
-      if (dn == null) {
-        removeFromInvalidates(nodeId);
-        return 0;
-      }
-
-      Collection<Block> invalidateSet;
-      ArrayList<Block> blocksToInvalidate;
-      synchronized(recentInvalidateSets) {
-        invalidateSet = recentInvalidateSets.get(nodeId);
-        if (invalidateSet == null)
-          return 0;
-
-        blocksToInvalidate = new ArrayList<Block>(
-          getDatanodeManager().blockInvalidateLimit);
-
-        // # blocks that can be sent in one message is limited
-        Iterator<Block> it = invalidateSet.iterator();
-        for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
-            && it.hasNext(); blkCount++) {
-          blocksToInvalidate.add(it.next());
-          it.remove();
-        }
-
-        // If we send everything in this message, remove this node entry
-        if (!it.hasNext()) {
-          removeFromInvalidates(nodeId);
-        }
-
-        dn.addBlocksToBeInvalidated(blocksToInvalidate);
-
-        if (NameNode.stateChangeLog.isInfoEnabled()) {
-          StringBuilder blockList = new StringBuilder();
-          for (Block blk : blocksToInvalidate) {
-            blockList.append(' ');
-            blockList.append(blk);
-          }
-          NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
-              + " to delete " + blockList);
-        }
-        pendingDeletionBlocksCount -= blocksToInvalidate.size();
-        return blocksToInvalidate.size();
-      }
+      return invalidateBlocks.invalidateWork(nodeId);
     } finally {
       namesystem.writeUnlock();
     }

Added: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1160475&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (added)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Mon Aug 22 22:28:17 2011
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+/** 
+ * Keeps a Collection for every named machine containing blocks
+ * that have recently been invalidated and are thought to live
+ * on the machine in question.
+ */
+@InterfaceAudience.Private
+class InvalidateBlocks {
+  /** Mapping: StorageID -> Collection of Blocks */
+  private final Map<String, Collection<Block>> node2blocks =
+      new TreeMap<String, Collection<Block>>();
+  /** The total number of blocks in the map. */
+  private long numBlocks = 0L;
+
+  private final DatanodeManager datanodeManager;
+
+  InvalidateBlocks(final DatanodeManager datanodeManager) {
+    this.datanodeManager = datanodeManager;
+  }
+
+  /** @return the number of blocks to be invalidated . */
+  synchronized long numBlocks() {
+    return numBlocks;
+  }
+
+  /** Does this contain the block which is associated with the storage? */
+  synchronized boolean contains(final String storageID, final Block block) {
+    final Collection<Block> s = node2blocks.get(storageID);
+    return s != null && s.contains(block);
+  }
+
+  /**
+   * Add a block to the block collection
+   * which will be invalidated on the specified datanode.
+   */
+  synchronized void add(final Block block, final DatanodeInfo datanode,
+      final boolean log) {
+    Collection<Block> set = node2blocks.get(datanode.getStorageID());
+    if (set == null) {
+      set = new HashSet<Block>();
+      node2blocks.put(datanode.getStorageID(), set);
+    }
+    if (set.add(block)) {
+      numBlocks++;
+      if (log) {
+        NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+            + ": add " + block + " to " + datanode.getName());
+      }
+    }
+  }
+
+  /** Remove a storage from the invalidatesSet */
+  synchronized void remove(final String storageID) {
+    final Collection<Block> blocks = node2blocks.remove(storageID);
+    if (blocks != null) {
+      numBlocks -= blocks.size();
+    }
+  }
+
+  /** Remove the block from the specified storage. */
+  synchronized void remove(final String storageID, final Block block) {
+    final Collection<Block> v = node2blocks.get(storageID);
+    if (v != null && v.remove(block)) {
+      numBlocks--;
+      if (v.isEmpty()) {
+        node2blocks.remove(storageID);
+      }
+    }
+  }
+
+  /** Print the contents to out. */
+  synchronized void dump(final PrintWriter out) {
+    final int size = node2blocks.values().size();
+    out.println("Metasave: Blocks " + numBlocks 
+        + " waiting deletion from " + size + " datanodes.");
+    if (size == 0) {
+      return;
+    }
+
+    for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) {
+      final Collection<Block> blocks = entry.getValue();
+      if (blocks.size() > 0) {
+        out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
+      }
+    }
+  }
+
+  /** @return a list of the storage IDs. */
+  synchronized List<String> getStorageIDs() {
+    return new ArrayList<String>(node2blocks.keySet());
+  }
+
+  /** Invalidate work for the storage. */
+  int invalidateWork(final String storageId) {
+    final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
+    if (dn == null) {
+      remove(storageId);
+      return 0;
+    }
+    final List<Block> toInvalidate = invalidateWork(storageId, dn);
+    if (toInvalidate == null) {
+      return 0;
+    }
+
+    if (NameNode.stateChangeLog.isInfoEnabled()) {
+      NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+          + ": ask " + dn.getName() + " to delete " + toInvalidate);
+    }
+    return toInvalidate.size();
+  }
+
+  private synchronized List<Block> invalidateWork(
+      final String storageId, final DatanodeDescriptor dn) {
+    final Collection<Block> set = node2blocks.get(storageId);
+    if (set == null) {
+      return null;
+    }
+
+    // # blocks that can be sent in one message is limited
+    final int limit = datanodeManager.blockInvalidateLimit;
+    final List<Block> toInvalidate = new ArrayList<Block>(limit);
+    final Iterator<Block> it = set.iterator();
+    for(int count = 0; count < limit && it.hasNext(); count++) {
+      toInvalidate.add(it.next());
+      it.remove();
+    }
+    // If we send everything in this message, remove this node entry
+    if (!it.hasNext()) {
+      remove(storageId);
+    }
+
+    dn.addBlocksToBeInvalidated(toInvalidate);
+    numBlocks -= toInvalidate.size();
+    return toInvalidate;
+  }
+}