You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ki...@apache.org on 2013/01/31 19:51:25 UTC
svn commit: r1441117 - in
/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/se...
Author: kihwal
Date: Thu Jan 31 18:51:25 2013
New Revision: 1441117
URL: http://svn.apache.org/viewvc?rev=1441117&view=rev
Log:
merge -r 1161991:1161992 Merging from trunk to branch-0.23 to fix HDFS-395
Added:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java (with props)
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jan 31 18:51:25 2013
@@ -5,6 +5,8 @@ Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
+ HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel
+ via hairong)
IMPROVEMENTS
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Jan 31 18:51:25 2013
@@ -63,10 +63,12 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
@@ -2129,7 +2131,7 @@ public class BlockManager {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
- private void removeStoredBlock(Block block, DatanodeDescriptor node) {
+ public void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(blockLog.isDebugEnabled()) {
blockLog.debug("BLOCK* removeStoredBlock: "
+ block + " from " + node.getName());
@@ -2248,27 +2250,48 @@ public class BlockManager {
}
}
- /** The given node is reporting that it received a certain block. */
- public void blockReceived(final DatanodeID nodeID, final String poolId,
- final Block block, final String delHint) throws IOException {
+ /** The given node is reporting that it received/deleted certain blocks. */
+ public void blockReceivedAndDeleted(final DatanodeID nodeID,
+ final String poolId,
+ final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
+ ) throws IOException {
namesystem.writeLock();
+ int received = 0;
+ int deleted = 0;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
- final String s = block + " is received from dead or unregistered node "
- + nodeID.getName();
- blockLog.warn("BLOCK* blockReceived: " + s);
- throw new IOException(s);
- }
-
- if (blockLog.isDebugEnabled()) {
- blockLog.debug("BLOCK* blockReceived: " + block
- + " is received from " + nodeID.getName());
+ blockLog.warn("BLOCK* blockReceivedDeleted"
+ + " is received from dead or unregistered node "
+ + nodeID.getName());
+ throw new IOException(
+ "Got blockReceivedDeleted message from unregistered or dead node");
}
- addBlock(node, block, delHint);
+ for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
+ if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
+ removeStoredBlock(
+ receivedAndDeletedBlocks[i].getBlock(), node);
+ deleted++;
+ } else {
+ addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
+ receivedAndDeletedBlocks[i].getDelHints());
+ received++;
+ }
+ if (blockLog.isDebugEnabled()) {
+ blockLog.debug("BLOCK* block"
+ + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
+ : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+ + " is received from " + nodeID.getName());
+ }
+ }
} finally {
namesystem.writeUnlock();
+ if (blockLog.isDebugEnabled()) {
+ blockLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+ + nodeID.getName() + " received: " + received + ", "
+ + " deleted: " + deleted);
+ }
}
}
@@ -2447,6 +2470,7 @@ public class BlockManager {
}
public void removeBlock(Block block) {
+ block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Thu Jan 31 18:51:25 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -86,6 +87,7 @@ class BPOfferService implements Runnable
DatanodeRegistration bpRegistration;
long lastBlockReport = 0;
+ long lastDeletedReport = 0;
boolean resetBlockReportTime = true;
@@ -93,8 +95,9 @@ class BPOfferService implements Runnable
DatanodeProtocol bpNamenode;
private long lastHeartbeat = 0;
private volatile boolean initialized = false;
- private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- private final LinkedList<String> delHints = new LinkedList<String>();
+ private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
+ = new LinkedList<ReceivedDeletedBlockInfo>();
+ private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
@@ -111,12 +114,12 @@ class BPOfferService implements Runnable
*/
@VisibleForTesting
void triggerHeartbeatForTests() throws IOException {
- synchronized(receivedBlockList) {
+ synchronized(receivedAndDeletedBlockList) {
lastHeartbeat = 0;
- receivedBlockList.notifyAll();
+ receivedAndDeletedBlockList.notifyAll();
while (lastHeartbeat == 0) {
try {
- receivedBlockList.wait(100);
+ receivedAndDeletedBlockList.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -288,69 +291,76 @@ class BPOfferService implements Runnable
* Report received blocks and delete hints to the Namenode
* @throws IOException
*/
- private void reportReceivedBlocks() throws IOException {
- //check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints){
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of " +
- "the same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
+ private void reportReceivedDeletedBlocks() throws IOException {
+ // check if there are newly received blocks
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+ int currentReceivedRequestsCounter;
+ synchronized (receivedAndDeletedBlockList) {
+ currentReceivedRequestsCounter = pendingReceivedRequests;
+ int numBlocks = receivedAndDeletedBlockList.size();
+ if (numBlocks > 0) {
+ //
+ // Send newly-received and deleted blockids to namenode
+ //
+ receivedAndDeletedBlockArray = receivedAndDeletedBlockList
+ .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
}
}
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
- delHintArray);
- synchronized(receivedBlockList) {
- synchronized(delHints){
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
+ if (receivedAndDeletedBlockArray != null) {
+ bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
+ receivedAndDeletedBlockArray);
+ synchronized (receivedAndDeletedBlockList) {
+ for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
+ receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
}
+ pendingReceivedRequests -= currentReceivedRequestsCounter;
}
}
}
+
+
/*
* Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
- if(block==null || delHint==null) {
- throw new IllegalArgumentException(
- block==null?"Block is null":"delHint is null");
+ if (block == null || delHint == null) {
+ throw new IllegalArgumentException(block == null ? "Block is null"
+ : "delHint is null");
}
-
+
if (!block.getBlockPoolId().equals(getBlockPoolId())) {
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ getBlockPoolId());
return;
}
-
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- receivedBlockList.add(block.getLocalBlock());
- delHints.add(delHint);
- receivedBlockList.notifyAll();
- }
+
+ synchronized (receivedAndDeletedBlockList) {
+ receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+ .getLocalBlock(), delHint));
+ pendingReceivedRequests++;
+ receivedAndDeletedBlockList.notifyAll();
}
}
+ void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ if (block == null) {
+ throw new IllegalArgumentException("Block is null");
+ }
+
+ if (!block.getBlockPoolId().equals(getBlockPoolId())) {
+ LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ + getBlockPoolId());
+ return;
+ }
+
+ synchronized (receivedAndDeletedBlockList) {
+ receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+ .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
+ }
+ }
/**
* Report the list blocks to the Namenode
@@ -460,7 +470,8 @@ class BPOfferService implements Runnable
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
- LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+ LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+ + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ dnConf.blockReportInterval + "msec" + " Initial delay: "
+ dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ dnConf.heartBeatInterval);
@@ -499,7 +510,11 @@ class BPOfferService implements Runnable
}
}
- reportReceivedBlocks();
+ if (pendingReceivedRequests > 0
+ || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
+ reportReceivedDeletedBlocks();
+ lastDeletedReport = startTime;
+ }
DatanodeCommand cmd = blockReport();
processCommand(cmd);
@@ -515,10 +530,10 @@ class BPOfferService implements Runnable
//
long waitTime = dnConf.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat);
- synchronized(receivedBlockList) {
- if (waitTime > 0 && receivedBlockList.size() == 0) {
+ synchronized(receivedAndDeletedBlockList) {
+ if (waitTime > 0 && pendingReceivedRequests == 0) {
try {
- receivedBlockList.wait(waitTime);
+ receivedAndDeletedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Thu Jan 31 18:51:25 2013
@@ -55,6 +55,7 @@ class DNConf {
final long readaheadLength;
final long heartBeatInterval;
final long blockReportInterval;
+ final long deleteReportInterval;
final long initialBlockReportDelay;
final int writePacketSize;
@@ -105,6 +106,7 @@ class DNConf {
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
+ this.deleteReportInterval = 100 * heartBeatInterval;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Jan 31 18:51:25 2013
@@ -638,6 +638,17 @@ public class DataNode extends Configured
}
}
+ // calls specific to BP
+ protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+ if (bpos != null) {
+ bpos.notifyNamenodeDeletedBlock(block);
+ } else {
+ LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+ + block.getBlockPoolId());
+ }
+ }
+
public void reportBadBlocks(ExtendedBlock block) throws IOException{
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos == null || bpos.bpNamenode == null) {
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Jan 31 18:51:25 2013
@@ -1122,7 +1122,7 @@ class FSDataset implements FSDatasetInte
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
- asyncDiskService = new FSDatasetAsyncDiskService(roots);
+ asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
registerMBean(storage.getStorageID());
}
@@ -2053,15 +2053,19 @@ class FSDataset implements FSDatasetInte
volumeMap.remove(bpid, invalidBlks[i]);
}
File metaFile = DatanodeUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp());
-
+
// Delete the block asynchronously to make sure we can do it fast enough
- asyncDiskService.deleteAsync(v, bpid, f, metaFile,
- invalidBlks[i].toString());
+ asyncDiskService.deleteAsync(v, f, metaFile,
+ new ExtendedBlock(bpid, invalidBlks[i]));
}
if (error) {
throw new IOException("Error in deleting blocks.");
}
}
+
+ public void notifyNamenodeDeletedBlock(ExtendedBlock block){
+ datanode.notifyNamenodeDeletedBlock(block);
+ }
@Override // {@link FSDatasetInterface}
public synchronized boolean contains(final ExtendedBlock block) {
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Thu Jan 31 18:51:25 2013
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
/*
* This class is a container of multiple thread pools, each for a volume,
@@ -47,6 +49,8 @@ import org.apache.commons.logging.LogFac
*/
class FSDatasetAsyncDiskService {
+ final FSDataset dataset;
+
public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
// ThreadPool core pool size
@@ -70,8 +74,8 @@ class FSDatasetAsyncDiskService {
*
* @param volumes The roots of the data volumes.
*/
- FSDatasetAsyncDiskService(File[] volumes) {
-
+ FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
+ this.dataset = dataset;
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
final File vol = volumes[v];
@@ -147,13 +151,12 @@ class FSDatasetAsyncDiskService {
* Delete the block file and meta file from the disk asynchronously, adjust
* dfsUsed statistics accordingly.
*/
- void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
- File metaFile, String blockName) {
- DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
- + " for deletion");
- ReplicaFileDeleteTask deletionTask =
- new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile,
- blockName);
+ void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
+ ExtendedBlock block) {
+ DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
+ + " file " + blockFile + " for deletion");
+ ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
+ volume, blockFile, metaFile, block);
execute(volume.getCurrentDir(), deletionTask);
}
@@ -161,19 +164,19 @@ class FSDatasetAsyncDiskService {
* as decrement the dfs usage of the volume.
*/
static class ReplicaFileDeleteTask implements Runnable {
+ final FSDataset dataset;
final FSDataset.FSVolume volume;
- final String blockPoolId;
final File blockFile;
final File metaFile;
- final String blockName;
+ final ExtendedBlock block;
- ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
- File blockFile, File metaFile, String blockName) {
+ ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
+ File metaFile, ExtendedBlock block) {
+ this.dataset = dataset;
this.volume = volume;
- this.blockPoolId = bpid;
this.blockFile = blockFile;
this.metaFile = metaFile;
- this.blockName = blockName;
+ this.block = block;
}
FSDataset.FSVolume getVolume() {
@@ -183,9 +186,9 @@ class FSDatasetAsyncDiskService {
@Override
public String toString() {
// Called in AsyncDiskService.execute for displaying error messages.
- return "deletion of block " + blockPoolId + " " + blockName
- + " with block file " + blockFile + " and meta file " + metaFile
- + " from volume " + volume;
+ return "deletion of block " + block.getBlockPoolId() + " "
+ + block.getLocalBlock().toString() + " with block file " + blockFile
+ + " and meta file " + metaFile + " from volume " + volume;
}
@Override
@@ -193,12 +196,15 @@ class FSDatasetAsyncDiskService {
long dfsBytes = blockFile.length() + metaFile.length();
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block "
- + blockPoolId + " " + blockName + " at file " + blockFile
- + ". Ignored.");
+ + block.getBlockPoolId() + " " + block.getLocalBlock().toString()
+ + " at file " + blockFile + ". Ignored.");
} else {
- volume.decDfsUsed(blockPoolId, dfsBytes);
- DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
- + " at file " + blockFile);
+ if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
+ dataset.notifyNamenodeDeletedBlock(block);
+ }
+ volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
+ DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
+ + block.getLocalBlock().toString() + " at file " + blockFile);
}
}
};
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Jan 31 18:51:25 2013
@@ -74,6 +74,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable;
@@ -792,19 +793,18 @@ class NameNodeRpcServer implements Namen
}
@Override // DatanodeProtocol
- public void blockReceived(DatanodeRegistration nodeReg, String poolId,
- Block blocks[], String delHints[]) throws IOException {
+ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg);
- if(blockStateChangeLog.isDebugEnabled()) {
- blockStateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
- +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
- }
- for (int i = 0; i < blocks.length; i++) {
- namesystem.getBlockManager().blockReceived(
- nodeReg, poolId, blocks[i], delHints[i]);
+ if(stateChangeLog.isDebugEnabled()) {
+ blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+ +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
+ +" blocks.");
}
+ namesystem.getBlockManager().blockReceivedAndDeleted(
+ nodeReg, poolId, receivedAndDeletedBlocks);
}
-
+
@Override // DatanodeProtocol
public void errorReport(DatanodeRegistration nodeReg,
int errorCode, String msg) throws IOException {
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Thu Jan 31 18:51:25 2013
@@ -44,6 +44,16 @@ import org.apache.hadoop.io.WritableFact
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockCommand extends DatanodeCommand {
+
+ /**
+ * This constant is used to indicate that the block deletion does not need
+ * explicit ACK from the datanode. When a block is put into the list of blocks
+ * to be deleted, it's size is set to this constant. We assume that no block
+ * would actually have this size. Otherwise, we would miss ACKs for blocks
+ * with such size. Positive number is used for compatibility reasons.
+ */
+ public static final long NO_ACK = Long.MAX_VALUE;
+
String poolId;
Block blocks[];
DatanodeInfo targets[][];
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Thu Jan 31 18:51:25 2013
@@ -126,17 +126,19 @@ public interface DatanodeProtocol extend
long[] blocks) throws IOException;
/**
- * blockReceived() allows the DataNode to tell the NameNode about
- * recently-received block data, with a hint for pereferred replica
- * to be deleted when there is any excessive blocks.
+ * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
+ * recently-received and -deleted block data.
+ *
+ * For the case of received blocks, a hint for preferred replica to be
+ * deleted when there is any excessive blocks is provided.
* For example, whenever client code
* writes a new Block here, or another DataNode copies a Block to
* this DataNode, it will call blockReceived().
*/
- public void blockReceived(DatanodeRegistration registration,
+ public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId,
- Block blocks[],
- String[] delHints) throws IOException;
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
+ throws IOException;
/**
* errorReport() tells the NameNode about something that has gone
Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java?rev=1441117&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java Thu Jan 31 18:51:25 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A data structure to store Block and delHints together, used to send
+ * received/deleted ACKs.
+ */
+public class ReceivedDeletedBlockInfo implements Writable {
+ Block block;
+ String delHints;
+
+ public final static String TODELETE_HINT = "-";
+
+ public ReceivedDeletedBlockInfo() {
+ }
+
+ public ReceivedDeletedBlockInfo(Block blk, String delHints) {
+ this.block = blk;
+ this.delHints = delHints;
+ }
+
+ public Block getBlock() {
+ return this.block;
+ }
+
+ public void setBlock(Block blk) {
+ this.block = blk;
+ }
+
+ public String getDelHints() {
+ return this.delHints;
+ }
+
+ public void setDelHints(String hints) {
+ this.delHints = hints;
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof ReceivedDeletedBlockInfo)) {
+ return false;
+ }
+ ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o;
+ return this.block.equals(other.getBlock())
+ && this.delHints.equals(other.delHints);
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 0;
+ }
+
+ public boolean blockEquals(Block b) {
+ return this.block.equals(b);
+ }
+
+ public boolean isDeletedBlock() {
+ return delHints.equals(TODELETE_HINT);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ this.block.write(out);
+ Text.writeString(out, this.delHints);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.block = new Block();
+ this.block.readFields(in);
+ this.delHints = Text.readString(in);
+ }
+
+ public String toString() {
+ return block.toString() + ", delHint: " + delHints;
+ }
+}
Propchange: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Thu Jan 31 18:51:25 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.EnumSetWritable;
@@ -880,10 +881,10 @@ public class NNThroughputBenchmark {
receivedDNReg.setStorageInfo(
new DataStorage(nsInfo, dnInfo.getStorageID()));
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
- nameNodeProto.blockReceived( receivedDNReg,
- nameNode.getNamesystem().getBlockPoolId(),
- new Block[] {blocks[i]},
- new String[] {DataNode.EMPTY_DEL_HINT});
+ nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
+ .getNamesystem().getBlockPoolId(),
+ new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
+ blocks[i], DataNode.EMPTY_DEL_HINT) });
}
}
return blocks.length;
@@ -995,11 +996,10 @@ public class NNThroughputBenchmark {
for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
- nameNodeProto.blockReceived(
- datanodes[dnIdx].dnRegistration,
- loc.getBlock().getBlockPoolId(),
- new Block[] {loc.getBlock().getLocalBlock()},
- new String[] {""});
+ nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
+ .getBlock().getBlockPoolId(),
+ new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
+ .getBlock().getLocalBlock(), "") });
}
}
return prevBlock;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1441117&r1=1441116&r2=1441117&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java Thu Jan 31 18:51:25 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.junit.After;
import org.junit.Test;
@@ -104,12 +105,12 @@ public class TestDeadDatanode {
DatanodeProtocol dnp = cluster.getNameNodeRpc();
- Block[] blocks = new Block[] { new Block(0) };
- String[] delHints = new String[] { "" };
+ ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
+ new Block(0), "") };
// Ensure blockReceived call from dead datanode is rejected with IOException
try {
- dnp.blockReceived(reg, poolId, blocks, delHints);
+ dnp.blockReceivedAndDeleted(reg, poolId, blocks);
Assert.fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected