You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cn...@apache.org on 2013/12/13 18:28:18 UTC
svn commit: r1550774 [4/10] - in
/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./
src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apach...
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Dec 13 17:28:14 2013
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
@@ -64,81 +66,87 @@ public class BlockPlacementPolicyWithNod
* @return the chosen node
*/
@Override
- protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+ StorageType storageType) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+ // otherwise try local machine first
if (localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
- // otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list
- if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
- maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
- return localDataNode;
+ for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+ localDataNode.getStorageInfos())) {
+ if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+ maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
+ return localStorage;
+ }
}
}
}
// try a node on local node group
- DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+ DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
- if (chosenNode != null) {
- return chosenNode;
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+ if (chosenStorage != null) {
+ return chosenStorage;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+ }
+
+ /** @return the node of the second replica */
+ private static DatanodeDescriptor secondNode(Node localMachine,
+ List<DatanodeStorageInfo> results) {
+ // find the second replica
+ for(DatanodeStorageInfo nextStorage : results) {
+ DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor();
+ if (nextNode != localMachine) {
+ return nextNode;
+ }
+ }
+ return null;
}
-
@Override
- protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+ protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+ StorageType storageType) throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results,
- avoidStaleNodes);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local rack, but off-nodegroup
try {
- return chooseRandom(NetworkTopology.getFirstHalf(
- localMachine.getNetworkLocation()),
- excludedNodes, blocksize,
- maxNodesPerRack, results,
- avoidStaleNodes);
+ final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
+ return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e1) {
// find the second replica
- DatanodeDescriptor newLocal=null;
- for(DatanodeDescriptor nextNode : results) {
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
+ final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) {
try {
return chooseRandom(
clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
@@ -146,8 +154,9 @@ public class BlockPlacementPolicyWithNod
@Override
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, Set<Node> excludedNodes,
- long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
- boolean avoidStaleNodes) throws NotEnoughReplicasException {
+ long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes, StorageType storageType)
+ throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
final String rackLocation = NetworkTopology.getFirstHalf(
@@ -155,12 +164,12 @@ public class BlockPlacementPolicyWithNod
try {
// randomly choose from remote racks
chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
- maxReplicasPerRack, results, avoidStaleNodes);
+ maxReplicasPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
// fall back to the local rack
chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
rackLocation, excludedNodes, blocksize,
- maxReplicasPerRack, results, avoidStaleNodes);
+ maxReplicasPerRack, results, avoidStaleNodes, storageType);
}
}
@@ -170,46 +179,40 @@ public class BlockPlacementPolicyWithNod
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNodeGroup(
+ private DatanodeStorageInfo chooseLocalNodeGroup(
NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+ StorageType storageType) throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local node group
try {
return chooseRandom(
clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
+ storageType);
} catch (NotEnoughReplicasException e1) {
- // find the second replica
- DatanodeDescriptor newLocal=null;
- for(DatanodeDescriptor nextNode : results) {
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
+ final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) {
try {
return chooseRandom(
clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results,
- avoidStaleNodes);
+ avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Fri Dec 13 17:28:14 2013
@@ -29,11 +29,11 @@ import org.apache.hadoop.util.LightWeigh
* the datanodes that store the block.
*/
class BlocksMap {
- private static class NodeIterator implements Iterator<DatanodeDescriptor> {
+ private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
private BlockInfo blockInfo;
private int nextIdx = 0;
- NodeIterator(BlockInfo blkInfo) {
+ StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
}
@@ -44,8 +44,8 @@ class BlocksMap {
}
@Override
- public DatanodeDescriptor next() {
- return blockInfo.getDatanode(nextIdx++);
+ public DatanodeStorageInfo next() {
+ return blockInfo.getStorageInfo(nextIdx++);
}
@Override
@@ -115,18 +115,23 @@ class BlocksMap {
/**
* Searches for the block in the BlocksMap and
- * returns Iterator that iterates through the nodes the block belongs to.
+ * returns {@link Iterable} of the storages the block belongs to.
*/
- Iterator<DatanodeDescriptor> nodeIterator(Block b) {
- return nodeIterator(blocks.get(b));
+ Iterable<DatanodeStorageInfo> getStorages(Block b) {
+ return getStorages(blocks.get(b));
}
/**
* For a block that has already been retrieved from the BlocksMap
- * returns Iterator that iterates through the nodes the block belongs to.
+ * returns {@link Iterable} of the storages the block belongs to.
*/
- Iterator<DatanodeDescriptor> nodeIterator(BlockInfo storedBlock) {
- return new NodeIterator(storedBlock);
+ Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
+ return new Iterable<DatanodeStorageInfo>() {
+ @Override
+ public Iterator<DatanodeStorageInfo> iterator() {
+ return new StorageIterator(storedBlock);
+ }
+ };
}
/** counts number of containing nodes. Better than using iterator. */
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Dec 13 17:28:14 2013
@@ -18,23 +18,29 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is
@@ -43,6 +49,7 @@ import com.google.common.annotations.Vis
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo {
+ public static final Log LOG = LogFactory.getLog(DatanodeDescriptor.class);
public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
// Stores status of decommissioning.
@@ -54,9 +61,9 @@ public class DatanodeDescriptor extends
@InterfaceStability.Evolving
public static class BlockTargetPair {
public final Block block;
- public final DatanodeDescriptor[] targets;
+ public final DatanodeStorageInfo[] targets;
- BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+ BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
this.block = block;
this.targets = targets;
}
@@ -99,6 +106,9 @@ public class DatanodeDescriptor extends
}
}
+ private final Map<String, DatanodeStorageInfo> storageMap =
+ new HashMap<String, DatanodeStorageInfo>();
+
/**
* A list of CachedBlock objects on this datanode.
*/
@@ -164,37 +174,11 @@ public class DatanodeDescriptor extends
*/
private long lastCachingDirectiveSentTimeMs;
- /**
- * Head of the list of blocks on the datanode
- */
- private volatile BlockInfo blockList = null;
- /**
- * Number of blocks on the datanode
- */
- private int numBlocks = 0;
-
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
public boolean needKeyUpdate = false;
- /**
- * Set to false on any NN failover, and reset to true
- * whenever a block report is received.
- */
- private boolean heartbeatedSinceFailover = false;
-
- /**
- * At startup or at any failover, the DNs in the cluster may
- * have pending block deletions from a previous incarnation
- * of the NameNode. Thus, we consider their block contents
- * stale until we have received a block report. When a DN
- * is considered stale, any replicas on it are transitively
- * considered stale. If any block has at least one stale replica,
- * then no invalidations will be processed for this block.
- * See HDFS-1972.
- */
- private boolean blockContentsStale = true;
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
@@ -213,7 +197,7 @@ public class DatanodeDescriptor extends
private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
/* Variables for maintaining number of blocks scheduled to be written to
- * this datanode. This count is approximate and might be slightly bigger
+ * this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
* while writing the block).
*/
@@ -223,9 +207,6 @@ public class DatanodeDescriptor extends
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0;
- /** Set to false after processing first block report */
- private boolean firstBlockReport = true;
-
/**
* When set to true, the node is not in include list and is not allowed
* to communicate with the namenode
@@ -237,7 +218,8 @@ public class DatanodeDescriptor extends
* @param nodeID id of the data node
*/
public DatanodeDescriptor(DatanodeID nodeID) {
- this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
+ super(nodeID);
+ updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
}
/**
@@ -247,104 +229,60 @@ public class DatanodeDescriptor extends
*/
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
- this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
- }
-
- /**
- * DatanodeDescriptor constructor
- * @param nodeID id of the data node
- * @param capacity capacity of the data node
- * @param dfsUsed space used by the data node
- * @param remaining remaining capacity of the data node
- * @param bpused space used by the block pool corresponding to this namenode
- * @param cacheCapacity cache capacity of the data node
- * @param cacheUsed cache used on the data node
- * @param xceiverCount # of data transfers at the data node
- */
- public DatanodeDescriptor(DatanodeID nodeID,
- long capacity,
- long dfsUsed,
- long remaining,
- long bpused,
- long cacheCapacity,
- long cacheUsed,
- int xceiverCount,
- int failedVolumes) {
- super(nodeID);
- updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
- cacheUsed, xceiverCount, failedVolumes);
- }
-
- /**
- * DatanodeDescriptor constructor
- * @param nodeID id of the data node
- * @param networkLocation location of the data node in network
- * @param capacity capacity of the data node, including space used by non-dfs
- * @param dfsUsed the used space by dfs datanode
- * @param remaining remaining capacity of the data node
- * @param bpused space used by the block pool corresponding to this namenode
- * @param cacheCapacity cache capacity of the data node
- * @param cacheUsed cache used on the data node
- * @param xceiverCount # of data transfers at the data node
- */
- public DatanodeDescriptor(DatanodeID nodeID,
- String networkLocation,
- long capacity,
- long dfsUsed,
- long remaining,
- long bpused,
- long cacheCapacity,
- long cacheUsed,
- int xceiverCount,
- int failedVolumes) {
super(nodeID, networkLocation);
- updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
- cacheUsed, xceiverCount, failedVolumes);
+ updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
}
/**
- * Add datanode to the block.
- * Add block to the head of the list of blocks belonging to the data-node.
+ * Add data-node to the block. Add block to the head of the list of blocks
+ * belonging to the data-node.
*/
- public boolean addBlock(BlockInfo b) {
- if(!b.addNode(this))
- return false;
- // add to the head of the data-node list
- blockList = b.listInsert(blockList, this);
- numBlocks++;
- return true;
+ public boolean addBlock(String storageID, BlockInfo b) {
+ DatanodeStorageInfo s = getStorageInfo(storageID);
+ if (s != null) {
+ return s.addBlock(b);
+ }
+ return false;
}
-
- /**
- * Remove block from the list of blocks belonging to the data-node.
- * Remove datanode from the block.
- */
- public boolean removeBlock(BlockInfo b) {
- blockList = b.listRemove(blockList, this);
- if ( b.removeNode(this) ) {
- numBlocks--;
- return true;
- } else {
- return false;
+
+ DatanodeStorageInfo getStorageInfo(String storageID) {
+ synchronized (storageMap) {
+ return storageMap.get(storageID);
+ }
+ }
+ DatanodeStorageInfo[] getStorageInfos() {
+ synchronized (storageMap) {
+ final Collection<DatanodeStorageInfo> storages = storageMap.values();
+ return storages.toArray(new DatanodeStorageInfo[storages.size()]);
}
}
/**
- * Move block to the head of the list of blocks belonging to the data-node.
- * @return the index of the head of the blockList
+ * Remove block from the list of blocks belonging to the data-node. Remove
+ * data-node from the block.
*/
- int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
- blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
- return curIndex;
+ boolean removeBlock(BlockInfo b) {
+ int index = b.findStorageInfo(this);
+ // if block exists on this datanode
+ if (index >= 0) {
+ DatanodeStorageInfo s = b.getStorageInfo(index);
+ if (s != null) {
+ return s.removeBlock(b);
+ }
+ }
+ return false;
}
-
+
/**
- * Used for testing only
- * @return the head of the blockList
+ * Remove block from the list of blocks belonging to the data-node. Remove
+ * data-node from the block.
*/
- @VisibleForTesting
- protected BlockInfo getHead(){
- return blockList;
+ boolean removeBlock(String storageID, BlockInfo b) {
+ DatanodeStorageInfo s = getStorageInfo(storageID);
+ if (s != null) {
+ return s.removeBlock(b);
+ }
+ return false;
}
/**
@@ -355,9 +293,12 @@ public class DatanodeDescriptor extends
* @return the new block
*/
public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
- boolean done = removeBlock(oldBlock);
+ int index = oldBlock.findStorageInfo(this);
+ DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
+ boolean done = s.removeBlock(oldBlock);
assert done : "Old block should belong to the data-node when replacing";
- done = addBlock(newBlock);
+
+ done = s.addBlock(newBlock);
assert done : "New block should not belong to the data-node when replacing";
return newBlock;
}
@@ -368,7 +309,6 @@ public class DatanodeDescriptor extends
setBlockPoolUsed(0);
setDfsUsed(0);
setXceiverCount(0);
- this.blockList = null;
this.invalidateBlocks.clear();
this.volumeFailures = 0;
// pendingCached, cached, and pendingUncached are protected by the
@@ -392,66 +332,97 @@ public class DatanodeDescriptor extends
}
public int numBlocks() {
- return numBlocks;
+ int blocks = 0;
+ for (DatanodeStorageInfo entry : getStorageInfos()) {
+ blocks += entry.numBlocks();
+ }
+ return blocks;
}
/**
* Updates stats from datanode heartbeat.
*/
- public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
- long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount,
- int volFailures) {
- setCapacity(capacity);
- setRemaining(remaining);
- setBlockPoolUsed(blockPoolUsed);
- setDfsUsed(dfsUsed);
+ public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
+ long cacheUsed, int xceiverCount, int volFailures) {
+ long totalCapacity = 0;
+ long totalRemaining = 0;
+ long totalBlockPoolUsed = 0;
+ long totalDfsUsed = 0;
+
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
setLastUpdate(Time.now());
this.volumeFailures = volFailures;
- this.heartbeatedSinceFailover = true;
+ for (StorageReport report : reports) {
+ DatanodeStorageInfo storage = storageMap.get(report.getStorageID());
+ if (storage == null) {
+ // This is seen during cluster initialization when the heartbeat
+ // is received before the initial block reports from each storage.
+ storage = updateStorage(new DatanodeStorage(report.getStorageID()));
+ }
+ storage.receivedHeartbeat(report);
+ totalCapacity += report.getCapacity();
+ totalRemaining += report.getRemaining();
+ totalBlockPoolUsed += report.getBlockPoolUsed();
+ totalDfsUsed += report.getDfsUsed();
+ }
rollBlocksScheduled(getLastUpdate());
+
+ // Update total metrics for the node.
+ setCapacity(totalCapacity);
+ setRemaining(totalRemaining);
+ setBlockPoolUsed(totalBlockPoolUsed);
+ setDfsUsed(totalDfsUsed);
}
- /**
- * Iterates over the list of blocks belonging to the datanode.
- */
- public static class BlockIterator implements Iterator<BlockInfo> {
- private BlockInfo current;
- private DatanodeDescriptor node;
-
- BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
- this.current = head;
- this.node = dn;
+ private static class BlockIterator implements Iterator<BlockInfo> {
+ private int index = 0;
+ private final List<Iterator<BlockInfo>> iterators;
+
+ private BlockIterator(final DatanodeStorageInfo... storages) {
+ List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
+ for (DatanodeStorageInfo e : storages) {
+ iterators.add(e.getBlockIterator());
+ }
+ this.iterators = Collections.unmodifiableList(iterators);
}
@Override
public boolean hasNext() {
- return current != null;
+ update();
+ return !iterators.isEmpty() && iterators.get(index).hasNext();
}
@Override
public BlockInfo next() {
- BlockInfo res = current;
- current = current.getNext(current.findDatanode(node));
- return res;
+ update();
+ return iterators.get(index).next();
}
-
+
@Override
- public void remove() {
- throw new UnsupportedOperationException("Sorry. can't remove.");
+ public void remove() {
+ throw new UnsupportedOperationException("Remove unsupported.");
+ }
+
+ private void update() {
+ while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
+ index++;
+ }
}
}
- public Iterator<BlockInfo> getBlockIterator() {
- return new BlockIterator(this.blockList, this);
+ Iterator<BlockInfo> getBlockIterator() {
+ return new BlockIterator(getStorageInfos());
+ }
+ Iterator<BlockInfo> getBlockIterator(final String storageID) {
+ return new BlockIterator(getStorageInfo(storageID));
}
/**
* Store block replication work.
*/
- void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
+ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
assert(block != null && targets != null && targets.length > 0);
replicateBlocks.offer(new BlockTargetPair(block, targets));
}
@@ -526,18 +497,14 @@ public class DatanodeDescriptor extends
public int getBlocksScheduled() {
return currApproxBlocksScheduled + prevApproxBlocksScheduled;
}
-
- /**
- * Increments counter for number of blocks scheduled.
- */
- public void incBlocksScheduled() {
+
+ /** Increment the number of blocks scheduled. */
+ void incrementBlocksScheduled() {
currApproxBlocksScheduled++;
}
- /**
- * Decrements counter for number of blocks scheduled.
- */
- void decBlocksScheduled() {
+ /** Decrement the number of blocks scheduled. */
+ void decrementBlocksScheduled() {
if (prevApproxBlocksScheduled > 0) {
prevApproxBlocksScheduled--;
} else if (currApproxBlocksScheduled > 0) {
@@ -546,12 +513,9 @@ public class DatanodeDescriptor extends
// its ok if both counters are zero.
}
- /**
- * Adjusts curr and prev number of blocks scheduled every few minutes.
- */
+ /** Adjusts curr and prev number of blocks scheduled every few minutes. */
private void rollBlocksScheduled(long now) {
- if ((now - lastBlocksScheduledRollTime) >
- BLOCKS_SCHEDULED_ROLL_INTERVAL) {
+ if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
prevApproxBlocksScheduled = currApproxBlocksScheduled;
currApproxBlocksScheduled = 0;
lastBlocksScheduledRollTime = now;
@@ -647,7 +611,11 @@ public class DatanodeDescriptor extends
@Override
public void updateRegInfo(DatanodeID nodeReg) {
super.updateRegInfo(nodeReg);
- firstBlockReport = true; // must re-process IBR after re-registration
+
+ // must re-process IBR after re-registration
+ for(DatanodeStorageInfo storage : getStorageInfos()) {
+ storage.setBlockReportCount(0);
+ }
}
/**
@@ -664,26 +632,6 @@ public class DatanodeDescriptor extends
this.bandwidth = bandwidth;
}
- public boolean areBlockContentsStale() {
- return blockContentsStale;
- }
-
- public void markStaleAfterFailover() {
- heartbeatedSinceFailover = false;
- blockContentsStale = true;
- }
-
- public void receivedBlockReport() {
- if (heartbeatedSinceFailover) {
- blockContentsStale = false;
- }
- firstBlockReport = false;
- }
-
- boolean isFirstBlockReport() {
- return firstBlockReport;
- }
-
@Override
public String dumpDatanode() {
StringBuilder sb = new StringBuilder(super.dumpDatanode());
@@ -702,6 +650,19 @@ public class DatanodeDescriptor extends
return sb.toString();
}
+ DatanodeStorageInfo updateStorage(DatanodeStorage s) {
+ synchronized (storageMap) {
+ DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+ if (storage == null) {
+ LOG.info("Adding new storage ID " + s.getStorageID() +
+ " for DN " + getXferAddr());
+ storage = new DatanodeStorageInfo(this, s);
+ storageMap.put(s.getStorageID(), storage);
+ }
+ return storage;
+ }
+ }
+
/**
* @return The time at which we last sent caching directives to this
* DataNode, in monotonic milliseconds.
@@ -718,3 +679,4 @@ public class DatanodeDescriptor extends
this.lastCachingDirectiveSentTimeMs = time;
}
}
+
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Dec 13 17:28:14 2013
@@ -424,9 +424,13 @@ public class DatanodeManager {
}
- /** Get a datanode descriptor given corresponding storageID */
- DatanodeDescriptor getDatanode(final String storageID) {
- return datanodeMap.get(storageID);
+ /** Get a datanode descriptor given corresponding DatanodeUUID */
+ DatanodeDescriptor getDatanode(final String datanodeUuid) {
+ if (datanodeUuid == null) {
+ return null;
+ }
+
+ return datanodeMap.get(datanodeUuid);
}
/**
@@ -438,7 +442,7 @@ public class DatanodeManager {
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID
) throws UnregisteredNodeException {
- final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
+ final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
if (node == null)
return null;
if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
@@ -451,6 +455,20 @@ public class DatanodeManager {
return node;
}
+ public DatanodeStorageInfo[] getDatanodeStorageInfos(
+ DatanodeID[] datanodeID, String[] storageIDs)
+ throws UnregisteredNodeException {
+ if (datanodeID.length == 0) {
+ return null;
+ }
+ final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length];
+ for(int i = 0; i < datanodeID.length; i++) {
+ final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
+ storages[i] = dd.getStorageInfo(storageIDs[i]);
+ }
+ return storages;
+ }
+
/** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) {
@@ -528,7 +546,7 @@ public class DatanodeManager {
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
synchronized(datanodeMap) {
- host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
+ host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
networktopology.add(node); // may throw InvalidTopologyException
@@ -543,7 +561,7 @@ public class DatanodeManager {
/** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) {
- final String key = node.getStorageID();
+ final String key = node.getDatanodeUuid();
synchronized (datanodeMap) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
@@ -705,8 +723,10 @@ public class DatanodeManager {
/** Start decommissioning the specified datanode. */
private void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- LOG.info("Start Decommissioning " + node + " with " +
- node.numBlocks() + " blocks");
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ LOG.info("Start Decommissioning " + node + " " + storage
+ + " with " + storage.numBlocks() + " blocks");
+ }
heartbeatManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
@@ -729,24 +749,6 @@ public class DatanodeManager {
}
/**
- * Generate new storage ID.
- *
- * @return unique storage ID
- *
- * Note: that collisions are still possible if somebody will try
- * to bring in a data storage from a different cluster.
- */
- private String newStorageID() {
- String newID = null;
- while(newID == null) {
- newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
- if (datanodeMap.get(newID) != null)
- newID = null;
- }
- return newID;
- }
-
- /**
* Register the given datanode with the namenode. NB: the given
* registration is mutated and given back to the datanode.
*
@@ -784,9 +786,9 @@ public class DatanodeManager {
}
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
- + nodeReg + " storage " + nodeReg.getStorageID());
+ + nodeReg + " storage " + nodeReg.getDatanodeUuid());
- DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+ DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
nodeReg.getIpAddr(), nodeReg.getXferPort());
@@ -821,7 +823,7 @@ public class DatanodeManager {
*/
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
+ " is replaced by " + nodeReg + " with the same storageID "
- + nodeReg.getStorageID());
+ + nodeReg.getDatanodeUuid());
}
boolean success = false;
@@ -853,20 +855,8 @@ public class DatanodeManager {
}
}
return;
- }
-
- // this is a new datanode serving a new data storage
- if ("".equals(nodeReg.getStorageID())) {
- // this data storage has never been registered
- // it is either empty or was created by pre-storageID version of DFS
- nodeReg.setStorageID(newStorageID());
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.registerDatanode: "
- + "new storageID " + nodeReg.getStorageID() + " assigned.");
- }
}
-
+
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false;
@@ -1234,10 +1224,10 @@ public class DatanodeManager {
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
- final String blockPoolId,
- long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers,
- int failedVolumes) throws IOException {
+ StorageReport[] reports, final String blockPoolId,
+ long cacheCapacity, long cacheUsed, int xceiverCount,
+ int maxTransfers, int failedVolumes
+ ) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
@@ -1257,9 +1247,9 @@ public class DatanodeManager {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
- heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
- remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount,
- failedVolumes);
+ heartbeatManager.updateHeartbeat(nodeinfo, reports,
+ cacheCapacity, cacheUsed,
+ xceiverCount, failedVolumes);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
@@ -1274,32 +1264,32 @@ public class DatanodeManager {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfoUnderConstruction b : blocks) {
- DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
+ final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
- List<DatanodeDescriptor> recoveryLocations =
- new ArrayList<DatanodeDescriptor>(expectedLocations.length);
- for (int i = 0; i < expectedLocations.length; i++) {
- if (!expectedLocations[i].isStale(this.staleInterval)) {
- recoveryLocations.add(expectedLocations[i]);
+ final List<DatanodeStorageInfo> recoveryLocations =
+ new ArrayList<DatanodeStorageInfo>(storages.length);
+ for (int i = 0; i < storages.length; i++) {
+ if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
+ recoveryLocations.add(storages[i]);
}
}
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
if (recoveryLocations.size() > 1) {
- if (recoveryLocations.size() != expectedLocations.length) {
+ if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
- (expectedLocations.length - recoveryLocations.size()));
+ (storages.length - recoveryLocations.size()));
}
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
- recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
+ DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
b.getBlockRecoveryId()));
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
- expectedLocations,
+ DatanodeStorageInfo.toDatanodeInfos(storages),
b.getBlockRecoveryId()));
}
}
@@ -1416,7 +1406,9 @@ public class DatanodeManager {
LOG.info("Marking all datandoes as stale");
synchronized (datanodeMap) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
- dn.markStaleAfterFailover();
+ for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
+ storage.markStaleAfterFailover();
+ }
}
}
}
@@ -1455,3 +1447,4 @@ public class DatanodeManager {
this.shouldSendCachingCommands = shouldSendCachingCommands;
}
}
+
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Dec 13 17:28:14 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
@@ -181,7 +182,7 @@ class HeartbeatManager implements Datano
addDatanode(d);
//update its timestamp
- d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
+ d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
}
}
@@ -203,11 +204,11 @@ class HeartbeatManager implements Datano
}
synchronized void updateHeartbeat(final DatanodeDescriptor node,
- long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) {
+ StorageReport[] reports, long cacheCapacity, long cacheUsed,
+ int xceiverCount, int failedVolumes) {
stats.subtract(node);
- node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
- cacheCapacity, cacheUsed, xceiverCount, failedVolumes);
+ node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
+ xceiverCount, failedVolumes);
stats.add(node);
}
@@ -358,3 +359,4 @@ class HeartbeatManager implements Datano
}
}
}
+
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Dec 13 17:28:14 2013
@@ -78,10 +78,10 @@ class InvalidateBlocks {
*/
synchronized void add(final Block block, final DatanodeInfo datanode,
final boolean log) {
- LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
+ LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid());
if (set == null) {
set = new LightWeightHashSet<Block>();
- node2blocks.put(datanode.getStorageID(), set);
+ node2blocks.put(datanode.getDatanodeUuid(), set);
}
if (set.add(block)) {
numBlocks++;
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java Fri Dec 13 17:28:14 2013
@@ -34,5 +34,5 @@ public interface MutableBlockCollection
* and set the locations.
*/
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
- DatanodeDescriptor[] locations) throws IOException;
+ DatanodeStorageInfo[] storages) throws IOException;
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Fri Dec 13 17:28:14 2013
@@ -42,11 +42,13 @@ class PendingDataNodeMessages {
static class ReportedBlockInfo {
private final Block block;
private final DatanodeDescriptor dn;
+ private final String storageID;
private final ReplicaState reportedState;
- ReportedBlockInfo(DatanodeDescriptor dn, Block block,
+ ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
ReplicaState reportedState) {
this.dn = dn;
+ this.storageID = storageID;
this.block = block;
this.reportedState = reportedState;
}
@@ -58,6 +60,10 @@ class PendingDataNodeMessages {
DatanodeDescriptor getNode() {
return dn;
}
+
+ String getStorageID() {
+ return storageID;
+ }
ReplicaState getReportedState() {
return reportedState;
@@ -70,11 +76,11 @@ class PendingDataNodeMessages {
}
}
- void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
+ void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
ReplicaState reportedState) {
block = new Block(block);
getBlockQueue(block).add(
- new ReportedBlockInfo(dn, block, reportedState));
+ new ReportedBlockInfo(dn, storageID, block, reportedState));
count++;
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Dec 13 17:28:14 2013
@@ -236,6 +236,8 @@ public abstract class Storage extends St
final boolean useLock; // flag to enable storage lock
final StorageDirType dirType; // storage dir type
FileLock lock; // storage lock
+
+ private String storageUuid = null; // Storage directory identifier.
public StorageDirectory(File dir) {
// default dirType is null
@@ -246,6 +248,14 @@ public abstract class Storage extends St
this(dir, dirType, true);
}
+ public void setStorageUuid(String storageUuid) {
+ this.storageUuid = storageUuid;
+ }
+
+ public String getStorageUuid() {
+ return storageUuid;
+ }
+
/**
* Constructor
* @param dir directory corresponding to the storage
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Dec 13 17:28:14 2013
@@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteA
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -160,31 +161,32 @@ class BPOfferService {
synchronized NamespaceInfo getNamespaceInfo() {
return bpNSInfo;
}
-
+
@Override
public String toString() {
if (bpNSInfo == null) {
// If we haven't yet connected to our NN, we don't yet know our
// own block pool ID.
// If _none_ of the block pools have connected yet, we don't even
- // know the storage ID of this DN.
- String storageId = dn.getStorageId();
- if (storageId == null || "".equals(storageId)) {
- storageId = "unknown";
+ // know the DatanodeID ID of this DN.
+ String datanodeUuid = dn.getDatanodeUuid();
+
+ if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+ datanodeUuid = "unassigned";
}
- return "Block pool <registering> (storage id " + storageId +
- ")";
+ return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
} else {
return "Block pool " + getBlockPoolId() +
- " (storage id " + dn.getStorageId() +
- ")";
+ " (Datanode Uuid " + dn.getDatanodeUuid() +
+ ")";
}
}
- void reportBadBlocks(ExtendedBlock block) {
+ void reportBadBlocks(ExtendedBlock block,
+ String storageUuid, StorageType storageType) {
checkBlock(block);
for (BPServiceActor actor : bpServices) {
- actor.reportBadBlocks(block);
+ actor.reportBadBlocks(block, storageUuid, storageType);
}
}
@@ -193,7 +195,8 @@ class BPOfferService {
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
- void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+ void notifyNamenodeReceivedBlock(
+ ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@@ -202,7 +205,7 @@ class BPOfferService {
delHint);
for (BPServiceActor actor : bpServices) {
- actor.notifyNamenodeBlockImmediately(bInfo);
+ actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
}
}
@@ -219,23 +222,23 @@ class BPOfferService {
"delHint is null");
}
- void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
for (BPServiceActor actor : bpServices) {
- actor.notifyNamenodeDeletedBlock(bInfo);
+ actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
}
}
- void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+ void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
for (BPServiceActor actor : bpServices) {
- actor.notifyNamenodeBlockImmediately(bInfo);
+ actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
}
}
@@ -274,12 +277,22 @@ class BPOfferService {
synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo;
-
+ boolean success = false;
+
// Now that we know the namespace ID, etc, we can pass this to the DN.
// The DN can now initialize its local storage if we are the
// first BP to handshake, etc.
- dn.initBlockPool(this);
- return;
+ try {
+ dn.initBlockPool(this);
+ success = true;
+ } finally {
+ if (!success) {
+ // The datanode failed to initialize the BP. We need to reset
+ // the namespace info so that other BPService actors still have
+ // a chance to set it, and re-initialize the datanode.
+ this.bpNSInfo = null;
+ }
+ }
} else {
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
"Blockpool ID");
@@ -328,7 +341,7 @@ class BPOfferService {
}
}
- synchronized DatanodeRegistration createRegistration() {
+ synchronized DatanodeRegistration createRegistration() throws IOException {
Preconditions.checkState(bpNSInfo != null,
"getRegistration() can only be called after initial handshake");
return dn.createBPRegistration(bpNSInfo);
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Dec 13 17:28:14 2013
@@ -22,7 +22,6 @@ import static org.apache.hadoop.util.Tim
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -31,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@@ -100,9 +99,9 @@ class BPServiceActor implements Runnable
* keyed by block ID, contains the pending changes which have yet to be
* reported to the NN. Access should be synchronized on this object.
*/
- private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR
- = Maps.newHashMap();
-
+ private final Map<String, PerStoragePendingIncrementalBR>
+ pendingIncrementalBRperStorage = Maps.newHashMap();
+
private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
@@ -244,12 +243,15 @@ class BPServiceActor implements Runnable
resetBlockReportTime = true; // reset future BRs for randomness
}
- void reportBadBlocks(ExtendedBlock block) {
+ void reportBadBlocks(ExtendedBlock block,
+ String storageUuid, StorageType storageType) {
if (bpRegistration == null) {
return;
}
DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
- LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
+ String[] uuids = { storageUuid };
+ StorageType[] types = { storageType };
+ LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
try {
bpNamenode.reportBadBlocks(blocks);
@@ -263,69 +265,120 @@ class BPServiceActor implements Runnable
}
/**
- * Report received blocks and delete hints to the Namenode
- *
+ * Report received blocks and delete hints to the Namenode for each
+ * storage.
+ *
* @throws IOException
*/
private void reportReceivedDeletedBlocks() throws IOException {
- // check if there are newly received blocks
- ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
- synchronized (pendingIncrementalBR) {
- int numBlocks = pendingIncrementalBR.size();
- if (numBlocks > 0) {
- //
- // Send newly-received and deleted blockids to namenode
- //
- receivedAndDeletedBlockArray = pendingIncrementalBR
- .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+ // Generate a list of the pending reports for each storage under the lock
+ Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap();
+ synchronized (pendingIncrementalBRperStorage) {
+ for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+ pendingIncrementalBRperStorage.entrySet()) {
+ final String storageUuid = entry.getKey();
+ final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
+
+ if (perStorageMap.getBlockInfoCount() > 0) {
+ // Send newly-received and deleted blockids to namenode
+ ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
+ pendingReceivedRequests =
+ (pendingReceivedRequests > rdbi.length ?
+ (pendingReceivedRequests - rdbi.length) : 0);
+ blockArrays.put(storageUuid, rdbi);
+ }
}
- pendingIncrementalBR.clear();
}
- if (receivedAndDeletedBlockArray != null) {
+
+ // Send incremental block reports to the Namenode outside the lock
+ for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry :
+ blockArrays.entrySet()) {
+ final String storageUuid = entry.getKey();
+ final ReceivedDeletedBlockInfo[] rdbi = entry.getValue();
+
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
- bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
+ storageUuid, rdbi) };
boolean success = false;
try {
- bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
- report);
+ bpNamenode.blockReceivedAndDeleted(bpRegistration,
+ bpos.getBlockPoolId(), report);
success = true;
} finally {
- synchronized (pendingIncrementalBR) {
- if (!success) {
+ if (!success) {
+ synchronized (pendingIncrementalBRperStorage) {
// If we didn't succeed in sending the report, put all of the
- // blocks back onto our queue, but only in the case where we didn't
- // put something newer in the meantime.
- for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
- if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
- pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
- }
- }
+ // blocks back onto our queue, but only in the case where we
+ // didn't put something newer in the meantime.
+ PerStoragePendingIncrementalBR perStorageMap =
+ pendingIncrementalBRperStorage.get(storageUuid);
+ pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi);
}
- pendingReceivedRequests = pendingIncrementalBR.size();
}
}
}
}
+ /**
+ * Retrieve the incremental BR state for a given storage UUID
+ * @param storageUuid
+ * @return
+ */
+ private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
+ String storageUuid) {
+ PerStoragePendingIncrementalBR mapForStorage =
+ pendingIncrementalBRperStorage.get(storageUuid);
+
+ if (mapForStorage == null) {
+ // This is the first time we are adding incremental BR state for
+ // this storage so create a new map. This is required once per
+ // storage, per service actor.
+ mapForStorage = new PerStoragePendingIncrementalBR();
+ pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
+ }
+
+ return mapForStorage;
+ }
+
+ /**
+ * Add a blockInfo for notification to NameNode. If another entry
+ * exists for the same block it is removed.
+ *
+ * Caller must synchronize access using pendingIncrementalBRperStorage.
+ * @param bInfo
+ * @param storageUuid
+ */
+ void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
+ String storageUuid) {
+ // Make sure another entry for the same block is first removed.
+ // There may only be one such entry.
+ for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+ pendingIncrementalBRperStorage.entrySet()) {
+ if (entry.getValue().removeBlockInfo(bInfo)) {
+ break;
+ }
+ }
+ getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
+ }
+
/*
* Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
- void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
- synchronized (pendingIncrementalBR) {
- pendingIncrementalBR.put(
- bInfo.getBlock().getBlockId(), bInfo);
+ void notifyNamenodeBlockImmediately(
+ ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+ synchronized (pendingIncrementalBRperStorage) {
+ addPendingReplicationBlockInfo(bInfo, storageUuid);
pendingReceivedRequests++;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
}
}
- void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
- synchronized (pendingIncrementalBR) {
- pendingIncrementalBR.put(
- bInfo.getBlock().getBlockId(), bInfo);
+ void notifyNamenodeDeletedBlock(
+ ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+ synchronized (pendingIncrementalBRperStorage) {
+ addPendingReplicationBlockInfo(bInfo, storageUuid);
}
}
@@ -334,13 +387,13 @@ class BPServiceActor implements Runnable
*/
@VisibleForTesting
void triggerBlockReportForTests() {
- synchronized (pendingIncrementalBR) {
+ synchronized (pendingIncrementalBRperStorage) {
lastBlockReport = 0;
lastHeartbeat = 0;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
while (lastBlockReport == 0) {
try {
- pendingIncrementalBR.wait(100);
+ pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -350,12 +403,12 @@ class BPServiceActor implements Runnable
@VisibleForTesting
void triggerHeartbeatForTests() {
- synchronized (pendingIncrementalBR) {
+ synchronized (pendingIncrementalBRperStorage) {
lastHeartbeat = 0;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
while (lastHeartbeat == 0) {
try {
- pendingIncrementalBR.wait(100);
+ pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -365,13 +418,13 @@ class BPServiceActor implements Runnable
@VisibleForTesting
void triggerDeletionReportForTests() {
- synchronized (pendingIncrementalBR) {
+ synchronized (pendingIncrementalBRperStorage) {
lastDeletedReport = 0;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
while (lastDeletedReport == 0) {
try {
- pendingIncrementalBR.wait(100);
+ pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -395,23 +448,38 @@ class BPServiceActor implements Runnable
// a FINALIZED one.
reportReceivedDeletedBlocks();
+ // Send one block report per known storage.
+
// Create block report
long brCreateStartTime = now();
- BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(
- bpos.getBlockPoolId());
+ long totalBlockCount = 0;
+
+ Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
+ dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
// Send block report
long brSendStartTime = now();
- StorageBlockReport[] report = { new StorageBlockReport(
- new DatanodeStorage(bpRegistration.getStorageID()),
- bReport.getBlockListAsLongs()) };
- cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
+ StorageBlockReport[] reports =
+ new StorageBlockReport[perVolumeBlockLists.size()];
+
+ int i = 0;
+ for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
+ DatanodeStorage dnStorage = kvPair.getKey();
+ BlockListAsLongs blockList = kvPair.getValue();
+ totalBlockCount += blockList.getNumberOfBlocks();
+
+ reports[i++] =
+ new StorageBlockReport(
+ dnStorage, blockList.getBlockListAsLongs());
+ }
+
+ cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
dn.getMetrics().addBlockReport(brSendCost);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ LOG.info("BlockReport of " + totalBlockCount
+ " blocks took " + brCreateCost + " msec to generate and "
+ brSendCost + " msecs for RPC and NN processing");
@@ -466,17 +534,15 @@ class BPServiceActor implements Runnable
}
HeartbeatResponse sendHeartBeat() throws IOException {
+ StorageReport[] reports =
+ dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
- LOG.debug("Sending heartbeat from service actor: " + this);
+ LOG.debug("Sending heartbeat with " + reports.length +
+ " storage reports from service actor: " + this);
}
- // reports number of failed volumes
- StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
- false,
- dn.getFSDataset().getCapacity(),
- dn.getFSDataset().getDfsUsed(),
- dn.getFSDataset().getRemaining(),
- dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
- return bpNamenode.sendHeartbeat(bpRegistration, report,
+
+ return bpNamenode.sendHeartbeat(bpRegistration,
+ reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
@@ -496,9 +562,9 @@ class BPServiceActor implements Runnable
}
private String formatThreadName() {
- Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
- return "DataNode: [" +
- StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
+ Collection<StorageLocation> dataDirs =
+ DataNode.getStorageLocations(dn.getConf());
+ return "DataNode: [" + dataDirs.toString() + "] " +
" heartbeating to " + nnAddr;
}
@@ -608,10 +674,10 @@ class BPServiceActor implements Runnable
//
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
- synchronized(pendingIncrementalBR) {
+ synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && pendingReceivedRequests == 0) {
try {
- pendingIncrementalBR.wait(waitTime);
+ pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
@@ -782,4 +848,68 @@ class BPServiceActor implements Runnable
}
}
+ private static class PerStoragePendingIncrementalBR {
+ private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
+ Maps.newHashMap();
+
+ /**
+ * Return the number of blocks on this storage that have pending
+ * incremental block reports.
+ * @return
+ */
+ int getBlockInfoCount() {
+ return pendingIncrementalBR.size();
+ }
+
+ /**
+ * Dequeue and return all pending incremental block report state.
+ * @return
+ */
+ ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
+ ReceivedDeletedBlockInfo[] blockInfos =
+ pendingIncrementalBR.values().toArray(
+ new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
+
+ pendingIncrementalBR.clear();
+ return blockInfos;
+ }
+
+ /**
+ * Add blocks from blockArray to pendingIncrementalBR, unless the
+ * block already exists in pendingIncrementalBR.
+ * @param blockArray list of blocks to add.
+ * @return the number of missing blocks that we added.
+ */
+ int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
+ int blocksPut = 0;
+ for (ReceivedDeletedBlockInfo rdbi : blockArray) {
+ if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
+ pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
+ ++blocksPut;
+ }
+ }
+ return blocksPut;
+ }
+
+ /**
+ * Add pending incremental block report for a single block.
+ * @param blockID
+ * @param blockInfo
+ */
+ void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+ pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
+ }
+
+ /**
+ * Remove pending incremental block report for a single block if it
+ * exists.
+ *
+ * @param blockInfo
+ * @return true if a report was removed, false if no report existed for
+ * the given block.
+ */
+ boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+ return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Dec 13 17:28:14 2013
@@ -187,7 +187,7 @@ class BlockPoolSliceScanner {
+ hours + " hours for block pool " + bpid);
// get the list of blocks and arrange them in random order
- List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
+ List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
Collections.shuffle(arr);
long scanTime = -1;
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Dec 13 17:28:14 2013
@@ -162,7 +162,8 @@ class BlockReceiver implements Closeable
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block);
- datanode.notifyNamenodeReceivingBlock(block);
+ datanode.notifyNamenodeReceivingBlock(
+ block, replicaInfo.getStorageUuid());
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
@@ -176,7 +177,8 @@ class BlockReceiver implements Closeable
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
- datanode.notifyNamenodeReceivingBlock(block);
+ datanode.notifyNamenodeReceivingBlock(
+ block, replicaInfo.getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -185,7 +187,8 @@ class BlockReceiver implements Closeable
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
- datanode.notifyNamenodeReceivingBlock(block);
+ datanode.notifyNamenodeReceivingBlock(
+ block, replicaInfo.getStorageUuid());
break;
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
@@ -252,6 +255,10 @@ class BlockReceiver implements Closeable
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
+ String getStorageUuid() {
+ return replicaInfo.getStorageUuid();
+ }
+
/**
* close files.
*/
@@ -1073,14 +1080,15 @@ class BlockReceiver implements Closeable
: 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(
+ block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
- dnR.getStorageID(), block, endTime - startTime));
+ dnR.getDatanodeUuid(), block, endTime - startTime));
} else {
LOG.info("Received " + block + " size " + block.getNumBytes()
+ " from " + inAddr);