You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/08/23 00:28:18 UTC
svn commit: r1160475 - in /hadoop/common/trunk/hadoop-hdfs: CHANGES.txt
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
Author: szetszwo
Date: Mon Aug 22 22:28:17 2011
New Revision: 1160475
URL: http://svn.apache.org/viewvc?rev=1160475&view=rev
Log:
HDFS-2273. Refactor BlockManager.recentInvalidateSets to a new class.
Added:
hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
Modified:
hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Modified: hadoop/common/trunk/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/CHANGES.txt?rev=1160475&r1=1160474&r2=1160475&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs/CHANGES.txt Mon Aug 22 22:28:17 2011
@@ -673,6 +673,9 @@ Trunk (unreleased changes)
HDFS-2096. Mavenization of hadoop-hdfs (Alejandro Abdelnur via tomwhite)
+ HDFS-2273. Refactor BlockManager.recentInvalidateSets to a new class.
+ (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1160475&r1=1160474&r2=1160475&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Aug 22 22:28:17 2011
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -89,7 +88,6 @@ public class BlockManager {
private volatile long underReplicatedBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
private volatile long excessBlocksCount = 0L;
- private volatile long pendingDeletionBlocksCount = 0L;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@@ -109,7 +107,7 @@ public class BlockManager {
}
/** Used by metrics */
public long getPendingDeletionBlocksCount() {
- return pendingDeletionBlocksCount;
+ return invalidateBlocks.numBlocks();
}
/** Used by metrics */
public long getExcessBlocksCount() {
@@ -131,14 +129,8 @@ public class BlockManager {
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
- //
- // Keeps a Collection for every named machine containing
- // blocks that have recently been invalidated and are thought to live
- // on the machine in question.
- // Mapping: StorageID -> ArrayList<Block>
- //
- private final Map<String, Collection<Block>> recentInvalidateSets =
- new TreeMap<String, Collection<Block>>();
+ /** Blocks to be invalidated. */
+ private final InvalidateBlocks invalidateBlocks;
//
// Keeps a TreeSet for every named node. Each treeset contains
@@ -182,6 +174,7 @@ public class BlockManager {
namesystem = fsn;
datanodeManager = new DatanodeManager(this, fsn, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
+ invalidateBlocks = new InvalidateBlocks(datanodeManager);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance(
@@ -306,7 +299,9 @@ public class BlockManager {
this.blockplacement = newpolicy;
}
+ /** Dump meta data to out. */
public void metaSave(PrintWriter out) {
+ assert namesystem.hasWriteLock();
//
// Dump contents of neededReplication
//
@@ -357,7 +352,7 @@ public class BlockManager {
pendingReplications.metaSave(out);
// Dump blocks that are waiting to be deleted
- dumpRecentInvalidateSets(out);
+ invalidateBlocks.dump(out);
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
@@ -493,7 +488,7 @@ public class BlockManager {
// remove this block from the list of pending blocks to be deleted.
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
- removeFromInvalidates(datanodeId, oldBlock);
+ invalidateBlocks.remove(datanodeId, oldBlock);
}
long fileLength = fileINode.computeContentSummary().getLength();
@@ -510,7 +505,7 @@ public class BlockManager {
blocksMap.nodeIterator(block); it.hasNext();) {
String storageID = it.next().getStorageID();
// filter invalidate replicas
- if( ! belongsToInvalidates(storageID, block)) {
+ if(!invalidateBlocks.contains(storageID, block)) {
machineSet.add(storageID);
}
}
@@ -754,64 +749,15 @@ public class BlockManager {
}
node.resetBlocks();
- removeFromInvalidates(node.getStorageID());
- }
-
- private void removeFromInvalidates(String storageID, Block block) {
- synchronized(recentInvalidateSets) {
- Collection<Block> v = recentInvalidateSets.get(storageID);
- if (v != null && v.remove(block)) {
- pendingDeletionBlocksCount--;
- if (v.isEmpty()) {
- recentInvalidateSets.remove(storageID);
- }
- }
- }
- }
-
- boolean belongsToInvalidates(String storageID, Block block) {
- Collection<Block> invalidateSet;
- synchronized(recentInvalidateSets) {
- invalidateSet = recentInvalidateSets.get(storageID);
- return invalidateSet != null && invalidateSet.contains(block);
- }
- }
-
- /**
- * Adds block to list of blocks which will be invalidated on specified
- * datanode
- *
- * @param b block
- * @param dn datanode
- * @param log true to create an entry in the log
- */
- private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
- synchronized(recentInvalidateSets) {
- Collection<Block> invalidateSet = recentInvalidateSets
- .get(dn.getStorageID());
- if (invalidateSet == null) {
- invalidateSet = new HashSet<Block>();
- recentInvalidateSets.put(dn.getStorageID(), invalidateSet);
- }
- if (invalidateSet.add(b)) {
- pendingDeletionBlocksCount++;
- if (log) {
- NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
- + b + " to " + dn.getName());
- }
- }
- }
+ invalidateBlocks.remove(node.getStorageID());
}
/**
* Adds block to list of blocks which will be invalidated on specified
* datanode and log the operation
- *
- * @param b block
- * @param dn datanode
*/
- void addToInvalidates(Block b, DatanodeInfo dn) {
- addToInvalidates(b, dn, true);
+ void addToInvalidates(final Block block, final DatanodeInfo datanode) {
+ invalidateBlocks.add(block, datanode, true);
}
/**
@@ -823,7 +769,7 @@ public class BlockManager {
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
.hasNext();) {
DatanodeDescriptor node = it.next();
- addToInvalidates(b, node, false);
+ invalidateBlocks.add(b, node, false);
datanodes.append(node.getName()).append(" ");
}
if (datanodes.length() != 0) {
@@ -833,30 +779,6 @@ public class BlockManager {
}
/**
- * dumps the contents of recentInvalidateSets
- */
- private void dumpRecentInvalidateSets(PrintWriter out) {
- assert namesystem.hasWriteLock();
- int size;
- synchronized(recentInvalidateSets) {
- size = recentInvalidateSets.values().size();
- }
- out.println("Metasave: Blocks " + pendingDeletionBlocksCount
- + " waiting deletion from " + size + " datanodes.");
- if (size == 0) {
- return;
- }
- synchronized(recentInvalidateSets) {
- for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
- Collection<Block> blocks = entry.getValue();
- if (blocks.size() > 0) {
- out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
- }
- }
- }
- }
-
- /**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
@@ -962,35 +884,14 @@ public class BlockManager {
* @return total number of block for deletion
*/
int computeInvalidateWork(int nodesToProcess) {
- int numOfNodes;
- ArrayList<String> keyArray;
+ final List<String> nodes = invalidateBlocks.getStorageIDs();
+ Collections.shuffle(nodes);
- synchronized(recentInvalidateSets) {
- numOfNodes = recentInvalidateSets.size();
- // get an array of the keys
- keyArray = new ArrayList<String>(recentInvalidateSets.keySet());
- }
-
- nodesToProcess = Math.min(numOfNodes, nodesToProcess);
-
- // randomly pick up <i>nodesToProcess</i> nodes
- // and put them at [0, nodesToProcess)
- int remainingNodes = numOfNodes - nodesToProcess;
- if (nodesToProcess < remainingNodes) {
- for(int i=0; i<nodesToProcess; i++) {
- int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i)+i;
- Collections.swap(keyArray, keyIndex, i); // swap to front
- }
- } else {
- for(int i=0; i<remainingNodes; i++) {
- int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i);
- Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
- }
- }
+ nodesToProcess = Math.min(nodes.size(), nodesToProcess);
int blockCnt = 0;
for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
- blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
+ blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
}
return blockCnt;
}
@@ -1592,7 +1493,7 @@ public class BlockManager {
}
// Ignore replicas already scheduled to be removed from the DN
- if(belongsToInvalidates(dn.getStorageID(), block)) {
+ if(invalidateBlocks.contains(dn.getStorageID(), block)) {
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ " in recentInvalidatesSet should not appear in DN " + dn;
return storedBlock;
@@ -2371,7 +2272,7 @@ public class BlockManager {
}
public int getActiveBlockCount() {
- return blocksMap.size() - (int)pendingDeletionBlocksCount;
+ return blocksMap.size() - (int)invalidateBlocks.numBlocks();
}
public DatanodeDescriptor[] getNodes(BlockInfo block) {
@@ -2441,16 +2342,6 @@ public class BlockManager {
return fileINode.getReplication();
}
- /** Remove a datanode from the invalidatesSet */
- private void removeFromInvalidates(String storageID) {
- Collection<Block> blocks;
- synchronized(recentInvalidateSets) {
- blocks = recentInvalidateSets.remove(storageID);
- }
- if (blocks != null) {
- pendingDeletionBlocksCount -= blocks.size();
- }
- }
/**
* Get blocks to invalidate for <i>nodeId</i>
@@ -2466,49 +2357,7 @@ public class BlockManager {
return 0;
// get blocks to invalidate for the nodeId
assert nodeId != null;
- final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
- if (dn == null) {
- removeFromInvalidates(nodeId);
- return 0;
- }
-
- Collection<Block> invalidateSet;
- ArrayList<Block> blocksToInvalidate;
- synchronized(recentInvalidateSets) {
- invalidateSet = recentInvalidateSets.get(nodeId);
- if (invalidateSet == null)
- return 0;
-
- blocksToInvalidate = new ArrayList<Block>(
- getDatanodeManager().blockInvalidateLimit);
-
- // # blocks that can be sent in one message is limited
- Iterator<Block> it = invalidateSet.iterator();
- for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
- && it.hasNext(); blkCount++) {
- blocksToInvalidate.add(it.next());
- it.remove();
- }
-
- // If we send everything in this message, remove this node entry
- if (!it.hasNext()) {
- removeFromInvalidates(nodeId);
- }
-
- dn.addBlocksToBeInvalidated(blocksToInvalidate);
-
- if (NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuilder blockList = new StringBuilder();
- for (Block blk : blocksToInvalidate) {
- blockList.append(' ');
- blockList.append(blk);
- }
- NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
- + " to delete " + blockList);
- }
- pendingDeletionBlocksCount -= blocksToInvalidate.size();
- return blocksToInvalidate.size();
- }
+ return invalidateBlocks.invalidateWork(nodeId);
} finally {
namesystem.writeUnlock();
}
Added: hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1160475&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (added)
+++ hadoop/common/trunk/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Mon Aug 22 22:28:17 2011
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+/**
+ * Keeps a Collection for every named machine containing blocks
+ * that have recently been invalidated and are thought to live
+ * on the machine in question.
+ */
+@InterfaceAudience.Private
+class InvalidateBlocks {
+ /** Mapping: StorageID -> Collection of Blocks */
+ private final Map<String, Collection<Block>> node2blocks =
+ new TreeMap<String, Collection<Block>>();
+ /** The total number of blocks in the map. */
+ private long numBlocks = 0L;
+
+ private final DatanodeManager datanodeManager;
+
+ InvalidateBlocks(final DatanodeManager datanodeManager) {
+ this.datanodeManager = datanodeManager;
+ }
+
+ /** @return the number of blocks to be invalidated . */
+ synchronized long numBlocks() {
+ return numBlocks;
+ }
+
+ /** Does this contain the block which is associated with the storage? */
+ synchronized boolean contains(final String storageID, final Block block) {
+ final Collection<Block> s = node2blocks.get(storageID);
+ return s != null && s.contains(block);
+ }
+
+ /**
+ * Add a block to the block collection
+ * which will be invalidated on the specified datanode.
+ */
+ synchronized void add(final Block block, final DatanodeInfo datanode,
+ final boolean log) {
+ Collection<Block> set = node2blocks.get(datanode.getStorageID());
+ if (set == null) {
+ set = new HashSet<Block>();
+ node2blocks.put(datanode.getStorageID(), set);
+ }
+ if (set.add(block)) {
+ numBlocks++;
+ if (log) {
+ NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+ + ": add " + block + " to " + datanode.getName());
+ }
+ }
+ }
+
+ /** Remove a storage from the invalidatesSet */
+ synchronized void remove(final String storageID) {
+ final Collection<Block> blocks = node2blocks.remove(storageID);
+ if (blocks != null) {
+ numBlocks -= blocks.size();
+ }
+ }
+
+ /** Remove the block from the specified storage. */
+ synchronized void remove(final String storageID, final Block block) {
+ final Collection<Block> v = node2blocks.get(storageID);
+ if (v != null && v.remove(block)) {
+ numBlocks--;
+ if (v.isEmpty()) {
+ node2blocks.remove(storageID);
+ }
+ }
+ }
+
+ /** Print the contents to out. */
+ synchronized void dump(final PrintWriter out) {
+ final int size = node2blocks.values().size();
+ out.println("Metasave: Blocks " + numBlocks
+ + " waiting deletion from " + size + " datanodes.");
+ if (size == 0) {
+ return;
+ }
+
+ for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) {
+ final Collection<Block> blocks = entry.getValue();
+ if (blocks.size() > 0) {
+ out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
+ }
+ }
+ }
+
+ /** @return a list of the storage IDs. */
+ synchronized List<String> getStorageIDs() {
+ return new ArrayList<String>(node2blocks.keySet());
+ }
+
+ /** Invalidate work for the storage. */
+ int invalidateWork(final String storageId) {
+ final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
+ if (dn == null) {
+ remove(storageId);
+ return 0;
+ }
+ final List<Block> toInvalidate = invalidateWork(storageId, dn);
+ if (toInvalidate == null) {
+ return 0;
+ }
+
+ if (NameNode.stateChangeLog.isInfoEnabled()) {
+ NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+ + ": ask " + dn.getName() + " to delete " + toInvalidate);
+ }
+ return toInvalidate.size();
+ }
+
+ private synchronized List<Block> invalidateWork(
+ final String storageId, final DatanodeDescriptor dn) {
+ final Collection<Block> set = node2blocks.get(storageId);
+ if (set == null) {
+ return null;
+ }
+
+ // # blocks that can be sent in one message is limited
+ final int limit = datanodeManager.blockInvalidateLimit;
+ final List<Block> toInvalidate = new ArrayList<Block>(limit);
+ final Iterator<Block> it = set.iterator();
+ for(int count = 0; count < limit && it.hasNext(); count++) {
+ toInvalidate.add(it.next());
+ it.remove();
+ }
+ // If we send everything in this message, remove this node entry
+ if (!it.hasNext()) {
+ remove(storageId);
+ }
+
+ dn.addBlocksToBeInvalidated(toInvalidate);
+ numBlocks -= toInvalidate.size();
+ return toInvalidate;
+ }
+}