You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/23 03:28:49 UTC
svn commit: r1534891 - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/
src/main/java/org/apache/hadoop/hdfs/s...
Author: arp
Date: Wed Oct 23 01:28:48 2013
New Revision: 1534891
URL: http://svn.apache.org/r1534891
Log:
HDFS-5390. Send one incremental block report per storage directory.
Modified:
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt Wed Oct 23 01:28:48 2013
@@ -42,3 +42,6 @@ IMPROVEMENTS:
HDFS-5398. NameNode changes to process storage reports per storage
directory. (Arpit Agarwal)
+
+ HDFS-5390. Send one incremental block report per storage directory.
+ (Arpit Agarwal)
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Oct 23 01:28:48 2013
@@ -192,7 +192,8 @@ class BPOfferService {
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
- void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+ void notifyNamenodeReceivedBlock(
+ ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@@ -201,7 +202,7 @@ class BPOfferService {
delHint);
for (BPServiceActor actor : bpServices) {
- actor.notifyNamenodeBlockImmediately(bInfo);
+ actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
}
}
@@ -218,23 +219,23 @@ class BPOfferService {
"delHint is null");
}
- void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
for (BPServiceActor actor : bpServices) {
- actor.notifyNamenodeDeletedBlock(bInfo);
+ actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
}
}
- void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+ void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
for (BPServiceActor actor : bpServices) {
- actor.notifyNamenodeBlockImmediately(bInfo);
+ actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Oct 23 01:28:48 2013
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -94,9 +93,9 @@ class BPServiceActor implements Runnable
* keyed by block ID, contains the pending changes which have yet to be
* reported to the NN. Access should be synchronized on this object.
*/
- private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR
- = Maps.newHashMap();
-
+ private final Map<String, PerStoragePendingIncrementalBR>
+ pendingIncrementalBRperStorage = Maps.newConcurrentMap();
+
private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
@@ -263,64 +262,84 @@ class BPServiceActor implements Runnable
* @throws IOException
*/
private void reportReceivedDeletedBlocks() throws IOException {
-
- // check if there are newly received blocks
- ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
- synchronized (pendingIncrementalBR) {
- int numBlocks = pendingIncrementalBR.size();
- if (numBlocks > 0) {
- //
- // Send newly-received and deleted blockids to namenode
- //
- receivedAndDeletedBlockArray = pendingIncrementalBR
- .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+ // For each storage, check if there are newly received blocks and if
+ // so then send an incremental report to the NameNode.
+ for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+ pendingIncrementalBRperStorage.entrySet()) {
+ final String storageUuid = entry.getKey();
+ final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+ // TODO: We can probably use finer-grained synchronization now.
+ synchronized (pendingIncrementalBRperStorage) {
+ if (perStorageMap.getBlockInfoCount() > 0) {
+ // Send newly-received and deleted blockids to namenode
+ receivedAndDeletedBlockArray = perStorageMap.dequeueBlockInfos();
+ pendingReceivedRequests -= receivedAndDeletedBlockArray.length;
+ }
}
- pendingIncrementalBR.clear();
- }
- if (receivedAndDeletedBlockArray != null) {
- StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
- bpRegistration.getDatanodeUuid(), receivedAndDeletedBlockArray) };
- boolean success = false;
- try {
- bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
- report);
- success = true;
- } finally {
- synchronized (pendingIncrementalBR) {
- if (!success) {
- // If we didn't succeed in sending the report, put all of the
- // blocks back onto our queue, but only in the case where we didn't
- // put something newer in the meantime.
- for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
- if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
- pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
- }
+
+ if (receivedAndDeletedBlockArray != null) {
+ StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+ storageUuid, receivedAndDeletedBlockArray) };
+ boolean success = false;
+ try {
+ bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
+ report);
+ success = true;
+ } finally {
+ synchronized (pendingIncrementalBRperStorage) {
+ if (!success) {
+ // If we didn't succeed in sending the report, put all of the
+ // blocks back onto our queue, but only in the case where we
+ // didn't put something newer in the meantime.
+ perStorageMap.putMissingBlockInfos(receivedAndDeletedBlockArray);
+ pendingReceivedRequests += perStorageMap.getBlockInfoCount();
}
}
- pendingReceivedRequests = pendingIncrementalBR.size();
}
}
}
}
+ /**
+ * Retrieve the incremental BR state for a given storage UUID
+ * @param storageUuid
+ * @return
+ */
+ private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
+ String storageUuid) {
+ PerStoragePendingIncrementalBR mapForStorage =
+ pendingIncrementalBRperStorage.get(storageUuid);
+
+ if (mapForStorage == null) {
+ // This is the first time we are adding incremental BR state for
+ // this storage so create a new map. This is required once per
+ // storage, per service actor.
+ mapForStorage = new PerStoragePendingIncrementalBR();
+ pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
+ }
+
+ return mapForStorage;
+ }
+
/*
* Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
- void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
- synchronized (pendingIncrementalBR) {
- pendingIncrementalBR.put(
- bInfo.getBlock().getBlockId(), bInfo);
+ void notifyNamenodeBlockImmediately(
+ ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+ synchronized (pendingIncrementalBRperStorage) {
+ getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
pendingReceivedRequests++;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
}
}
- void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
- synchronized (pendingIncrementalBR) {
- pendingIncrementalBR.put(
- bInfo.getBlock().getBlockId(), bInfo);
+ void notifyNamenodeDeletedBlock(
+ ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+ synchronized (pendingIncrementalBRperStorage) {
+ getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
}
}
@@ -329,13 +348,13 @@ class BPServiceActor implements Runnable
*/
@VisibleForTesting
void triggerBlockReportForTests() {
- synchronized (pendingIncrementalBR) {
+ synchronized (pendingIncrementalBRperStorage) {
lastBlockReport = 0;
lastHeartbeat = 0;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
while (lastBlockReport == 0) {
try {
- pendingIncrementalBR.wait(100);
+ pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -345,12 +364,12 @@ class BPServiceActor implements Runnable
@VisibleForTesting
void triggerHeartbeatForTests() {
- synchronized (pendingIncrementalBR) {
+ synchronized (pendingIncrementalBRperStorage) {
lastHeartbeat = 0;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
while (lastHeartbeat == 0) {
try {
- pendingIncrementalBR.wait(100);
+ pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -360,13 +379,13 @@ class BPServiceActor implements Runnable
@VisibleForTesting
void triggerDeletionReportForTests() {
- synchronized (pendingIncrementalBR) {
+ synchronized (pendingIncrementalBRperStorage) {
lastDeletedReport = 0;
- pendingIncrementalBR.notifyAll();
+ pendingIncrementalBRperStorage.notifyAll();
while (lastDeletedReport == 0) {
try {
- pendingIncrementalBR.wait(100);
+ pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
@@ -582,10 +601,10 @@ class BPServiceActor implements Runnable
//
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
- synchronized(pendingIncrementalBR) {
+ synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && pendingReceivedRequests == 0) {
try {
- pendingIncrementalBR.wait(waitTime);
+ pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
@@ -756,4 +775,52 @@ class BPServiceActor implements Runnable
}
}
+ private static class PerStoragePendingIncrementalBR {
+ private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
+ Maps.newHashMap();
+
+ /**
+ * Return the number of blocks on this storage that have pending
+ * incremental block reports.
+ * @return
+ */
+ int getBlockInfoCount() {
+ return pendingIncrementalBR.size();
+ }
+
+ /**
+ * Dequeue and return all pending incremental block report state.
+ * @return
+ */
+ ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
+ ReceivedDeletedBlockInfo[] blockInfos =
+ pendingIncrementalBR.values().toArray(
+ new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
+
+ pendingIncrementalBR.clear();
+ return blockInfos;
+ }
+
+ /**
+ * Add blocks from blockArray to pendingIncrementalBR, unless the
+ * block already exists in pendingIncrementalBR.
+ * @param blockArray list of blocks to add.
+ */
+ void putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
+ for (ReceivedDeletedBlockInfo rdbi : blockArray) {
+ if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
+ pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
+ }
+ }
+ }
+
+ /**
+ * Add pending incremental block report for a single block.
+ * @param blockID
+ * @param blockInfo
+ */
+ void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+ pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Oct 23 01:28:48 2013
@@ -162,7 +162,8 @@ class BlockReceiver implements Closeable
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block);
- datanode.notifyNamenodeReceivingBlock(block);
+ datanode.notifyNamenodeReceivingBlock(
+ block, replicaInfo.getStorageUuid());
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
@@ -176,7 +177,8 @@ class BlockReceiver implements Closeable
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
- datanode.notifyNamenodeReceivingBlock(block);
+ datanode.notifyNamenodeReceivingBlock(
+ block, replicaInfo.getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -185,7 +187,8 @@ class BlockReceiver implements Closeable
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
- datanode.notifyNamenodeReceivingBlock(block);
+ datanode.notifyNamenodeReceivingBlock(
+ block, replicaInfo.getStorageUuid());
break;
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
@@ -252,6 +255,10 @@ class BlockReceiver implements Closeable
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
+ public Replica getReplicaInfo() {
+ return replicaInfo;
+ }
+
/**
* close files.
*/
@@ -1072,7 +1079,8 @@ class BlockReceiver implements Closeable
: 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(
+ block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Oct 23 01:28:48 2013
@@ -520,10 +520,11 @@ public class DataNode extends Configured
}
// calls specific to BP
- protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+ protected void notifyNamenodeReceivedBlock(
+ ExtendedBlock block, String delHint, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
- bpos.notifyNamenodeReceivedBlock(block, delHint);
+ bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.error("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
@@ -531,10 +532,11 @@ public class DataNode extends Configured
}
// calls specific to BP
- protected void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+ protected void notifyNamenodeReceivingBlock(
+ ExtendedBlock block, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
- bpos.notifyNamenodeReceivingBlock(block);
+ bpos.notifyNamenodeReceivingBlock(block, storageUuid);
} else {
LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
+ block.getBlockPoolId());
@@ -542,10 +544,10 @@ public class DataNode extends Configured
}
/** Notify the corresponding namenode to delete the block. */
- public void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if (bpos != null) {
- bpos.notifyNamenodeDeletedBlock(block);
+ bpos.notifyNamenodeDeletedBlock(block, storageUuid);
} else {
LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
+ block.getBlockPoolId());
@@ -1528,11 +1530,11 @@ public class DataNode extends Configured
* @param block
* @param delHint
*/
- void closeBlock(ExtendedBlock block, String delHint) {
+ void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
- bpos.notifyNamenodeReceivedBlock(block, delHint);
+ bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
@@ -1892,7 +1894,7 @@ public class DataNode extends Configured
ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
newBlock.setGenerationStamp(recoveryId);
newBlock.setNumBytes(newLength);
- notifyNamenodeReceivedBlock(newBlock, "");
+ notifyNamenodeReceivedBlock(newBlock, "", storageID);
return storageID;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Oct 23 01:28:48 2013
@@ -447,6 +447,7 @@ class DataXceiver extends Receiver imple
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
+ Replica replica;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -457,8 +458,10 @@ class DataXceiver extends Receiver imple
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
+ replica = blockReceiver.getReplicaInfo();
} else {
- datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
+ replica =
+ datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
//
@@ -590,7 +593,8 @@ class DataXceiver extends Receiver imple
// the block is finalized in the PacketResponder.
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(
+ block, DataNode.EMPTY_DEL_HINT, replica.getStorageUuid());
LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ localAddress + " of size " + block.getNumBytes());
}
@@ -859,7 +863,8 @@ class DataXceiver extends Receiver imple
dataXceiverServer.balanceThrottler, null);
// notify name node
- datanode.notifyNamenodeReceivedBlock(block, delHint);
+ datanode.notifyNamenodeReceivedBlock(
+ block, delHint, blockReceiver.getReplicaInfo().getStorageUuid());
LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java Wed Oct 23 01:28:48 2013
@@ -54,4 +54,9 @@ public interface Replica {
* @return the number of bytes that are visible to readers
*/
public long getVisibleLength();
+
+ /**
+ * Return the storageUuid of the volume that stores this replica.
+ */
+ public String getStorageUuid();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Oct 23 01:28:48 2013
@@ -137,6 +137,14 @@ abstract public class ReplicaInfo extend
void setVolume(FsVolumeSpi vol) {
this.volume = vol;
}
+
+ /**
+ * Get the storageUuid of the volume that stores this replica.
+ */
+ @Override
+ public String getStorageUuid() {
+ return volume.getStorageID();
+ }
/**
* Return the parent directory path where this replica is located
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Wed Oct 23 01:28:48 2013
@@ -243,7 +243,7 @@ public interface FsDatasetSpi<V extends
* @param expectedBlockLen the number of bytes the replica is expected to have
* @throws IOException
*/
- public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
+ public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
) throws IOException;
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java Wed Oct 23 01:28:48 2013
@@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.StorageTyp
* This is an interface for the underlying volume.
*/
public interface FsVolumeSpi {
+ /** @return the StorageUuid of the volume */
+ public String getStorageID();
+
/** @return a list of block pools. */
public String[] getBlockPoolList();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Wed Oct 23 01:28:48 2013
@@ -195,7 +195,7 @@ class FsDatasetAsyncDiskService {
+ " at file " + blockFile + ". Ignored.");
} else {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
- datanode.notifyNamenodeDeletedBlock(block);
+ datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
}
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
LOG.info("Deleted " + block.getBlockPoolId() + " "
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Oct 23 01:28:48 2013
@@ -699,7 +699,7 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public void recoverClose(ExtendedBlock b, long newGS,
+ public Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b);
// check replica's state
@@ -710,6 +710,7 @@ class FsDatasetImpl implements FsDataset
if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
+ return replicaInfo;
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Wed Oct 23 01:28:48 2013
@@ -290,6 +290,7 @@ class FsVolumeImpl implements FsVolumeSp
}
}
+ @Override
public String getStorageID() {
return storageID;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Oct 23 01:28:48 2013
@@ -132,6 +132,11 @@ public class SimulatedFSDataset implemen
}
@Override
+ public String getStorageUuid() {
+ return storage.getStorageUuid();
+ }
+
+ @Override
synchronized public long getGenerationStamp() {
return theBlock.getGenerationStamp();
}
@@ -314,6 +319,8 @@ public class SimulatedFSDataset implemen
private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>();
+ private final String storageUuid = "SimulatedStorage-UUID";
+
private long capacity; // in bytes
synchronized long getFree() {
@@ -375,6 +382,10 @@ public class SimulatedFSDataset implemen
}
return bpStorage;
}
+
+ public String getStorageUuid() {
+ return storageUuid;
+ }
}
private final Map<String, Map<Block, BInfo>> blockMap
@@ -625,7 +636,7 @@ public class SimulatedFSDataset implemen
}
@Override // FsDatasetSpi
- public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+ public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
@@ -639,6 +650,7 @@ public class SimulatedFSDataset implemen
map.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo);
+ return binfo;
}
@Override // FsDatasetSpi
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java Wed Oct 23 01:28:48 2013
@@ -176,7 +176,7 @@ public class TestBPOfferService {
waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs
- bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "");
+ bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", "");
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
assertEquals(1, ret.length);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1534891&r1=1534890&r2=1534891&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Wed Oct 23 01:28:48 2013
@@ -412,6 +412,11 @@ public class TestDirectoryScanner {
public StorageType getStorageType() {
return StorageType.DEFAULT;
}
+
+ @Override
+ public String getStorageID() {
+ return "";
+ }
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();