You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/02/22 21:31:15 UTC
svn commit: r1073489 [1/2] - in /hadoop/hdfs/branches/HDFS-1052: ./
src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/java/org/apache/hadoop/hdfs/server/proto...
Author: suresh
Date: Tue Feb 22 20:31:14 2011
New Revision: 1073489
URL: http://svn.apache.org/viewvc?rev=1073489&view=rev
Log:
HDFS-1450. Federation: Introduce block pool ID into FSDatasetInterface. (suresh)
Modified:
hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDatanodeDescriptor.java
hadoop/hdfs/branches/HDFS-1052/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Feb 22 20:31:14 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
NEW FEATURES
+ HDFS-1450. Federation: Introduce block pool ID into FSDatasetInterface.
+ (suresh)
+
IMPROVEMENTS
HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Tue Feb 22 20:31:14 2011
@@ -54,6 +54,11 @@ public class ExtendedBlock implements Wr
this("TODO", b);
}
+ // TODO:FEDERATION To remove when block pool ID related coding is complete
+ public ExtendedBlock(final long blkId) {
+ this("TODO", new Block(blkId));
+ }
+
public ExtendedBlock(final ExtendedBlock b) {
this(b.poolId, b.block);
}
@@ -128,9 +133,9 @@ public class ExtendedBlock implements Wr
block.setNumBytes(len);
}
- public void set(String poolId, long blkid, long gs, long len) {
+ public void set(String poolId, Block blk) {
this.poolId = poolId;
- block.set(blkid, gs, len);
+ this.block = blk;
}
public static Block getLocalBlock(final ExtendedBlock b) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Feb 22 20:31:14 2011
@@ -35,7 +35,6 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -98,30 +97,31 @@ class BlockReceiver implements java.io.C
//
// Open local disk out
//
- // TODO:FEDERATION use ExtendedBlock in the following method calls
if (clientName.length() == 0) { //replication or move
- replicaInfo = datanode.data.createTemporary(block.getLocalBlock());
+ replicaInfo = datanode.data.createTemporary(block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(block.getLocalBlock());
+ replicaInfo = datanode.data.createRbw(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
- block.getLocalBlock(), newGs, minBytesRcvd, maxBytesRcvd);
+ block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
- replicaInfo = datanode.data.append(block.getLocalBlock(), newGs, minBytesRcvd);
+ replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block.getLocalBlock());
+ datanode.blockScanner.deleteBlock(block.getPoolId(),
+ block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
- replicaInfo = datanode.data.recoverAppend(block.getLocalBlock(), newGs, minBytesRcvd);
+ replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block.getLocalBlock());
+ datanode.blockScanner.deleteBlock(block.getPoolId(),
+ block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
@@ -615,7 +615,7 @@ class BlockReceiver implements java.io.C
try {
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
- new PacketResponder(this, block.getLocalBlock(), mirrIn, replyOut,
+ new PacketResponder(this, block, mirrIn, replyOut,
numTargets, Thread.currentThread()));
responder.start(); // start thread to processes reponses
}
@@ -642,8 +642,7 @@ class BlockReceiver implements java.io.C
// Finalize the block. Does this fsync()?
block.setNumBytes(replicaInfo.getNumBytes());
- // TODO:FEDERATION use ExtendedBlock
- datanode.data.finalizeBlock(block.getLocalBlock());
+ datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
}
@@ -675,8 +674,7 @@ class BlockReceiver implements java.io.C
*/
private void cleanupBlock() throws IOException {
if (clientName.length() == 0) { // not client write
- // TODO:FEDERATION use ExtendedBlock
- datanode.data.unfinalizeBlock(block.getLocalBlock());
+ datanode.data.unfinalizeBlock(block);
}
}
@@ -693,8 +691,7 @@ class BlockReceiver implements java.io.C
}
// rollback the position of the meta file
- // TODO:FEDERATION use ExtendedBlock
- datanode.data.adjustCrcChannelPosition(block.getLocalBlock(), streams, checksumSize);
+ datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
}
/**
@@ -722,8 +719,7 @@ class BlockReceiver implements java.io.C
byte[] crcbuf = new byte[checksumSize];
FSDataset.BlockInputStreams instr = null;
try {
- // TODO:FEDERATION use ExtendedBlock
- instr = datanode.data.getTmpInputStreams(block.getLocalBlock(), blkoff, ckoff);
+ instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
@@ -758,7 +754,7 @@ class BlockReceiver implements java.io.C
//packet waiting for ack
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private volatile boolean running = true;
- private Block block;
+ private ExtendedBlock block;
DataInputStream mirrorIn; // input from downstream datanode
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; // number of downstream datanodes including myself
@@ -769,7 +765,7 @@ class BlockReceiver implements java.io.C
return "PacketResponder " + numTargets + " for Block " + this.block;
}
- PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
+ PacketResponder(BlockReceiver receiver, ExtendedBlock b, DataInputStream in,
DataOutputStream out, int numTargets,
Thread receiverThread) {
this.receiverThread = receiverThread;
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Feb 22 20:31:14 2011
@@ -145,10 +145,9 @@ class BlockSender implements java.io.Clo
this.transferToAllowed = datanode.transferToAllowed;
this.clientTraceFmt = clientTraceFmt;
- // TODO:FEDERATION metaFileExists and getMetaDataInputStream should take ExtendedBlock
- if ( !corruptChecksumOk || datanode.data.metaFileExists(block.getLocalBlock()) ) {
+ if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
- .getMetaDataInputStream(block.getLocalBlock()), BUFFER_SIZE));
+ .getMetaDataInputStream(block), BUFFER_SIZE));
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
@@ -230,8 +229,7 @@ class BlockSender implements java.io.Clo
DataNode.LOG.debug("replica=" + replica);
}
- // TODO:FEDERATION getBlockInputStream must acccept ExtendedBlock
- blockIn = datanode.data.getBlockInputStream(block.getLocalBlock(), offset); // seek to offset
+ blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Feb 22 20:31:14 2011
@@ -267,18 +267,20 @@ class DataBlockScanner implements Runnab
}
/** Adds block to list of blocks */
- synchronized void addBlock(Block block) {
+ synchronized void addBlock(ExtendedBlock block) {
if (!isInitialized()) {
return;
}
- BlockScanInfo info = blockMap.get(block);
+ // TODO:FEDERATION use ExtendedBlock
+ BlockScanInfo info = blockMap.get(block.getLocalBlock());
if ( info != null ) {
LOG.warn("Adding an already existing block " + block);
delBlockInfo(info);
}
- info = new BlockScanInfo(block);
+ // TODO:FEDERATION use ExtendedBlock
+ info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime();
addBlockInfo(info);
@@ -286,10 +288,11 @@ class DataBlockScanner implements Runnab
}
/** Deletes the block from internal structures */
- synchronized void deleteBlock(Block block) {
+ synchronized void deleteBlock(String bpid, Block block) {
if (!isInitialized()) {
return;
}
+ // FEDERATION:TODO use bpid
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
@@ -306,9 +309,9 @@ class DataBlockScanner implements Runnab
}
/** Deletes blocks from internal structures */
- void deleteBlocks(Block[] blocks) {
+ void deleteBlocks(String bpid, Block[] blocks) {
for ( Block b : blocks ) {
- deleteBlock(b);
+ deleteBlock(bpid, b);
}
}
@@ -359,7 +362,7 @@ class DataBlockScanner implements Runnab
}
}
- private void handleScanFailure(Block block) {
+ private void handleScanFailure(ExtendedBlock block) {
LOG.info("Reporting bad block " + block + " to namenode.");
@@ -422,8 +425,7 @@ class DataBlockScanner implements Runnab
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
- private void verifyBlock(Block block) {
-
+ private void verifyBlock(ExtendedBlock block) {
BlockSender blockSender = null;
/* In case of failure, attempt to read second time to reduce
@@ -436,9 +438,8 @@ class DataBlockScanner implements Runnab
try {
adjustThrottler();
- // TODO:FEDERATION use ExtendedBlock
- blockSender = new BlockSender(new ExtendedBlock(block), 0, -1, false,
- false, true, datanode);
+ blockSender = new BlockSender(block, 0, -1, false, false, true,
+ datanode);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
@@ -452,18 +453,19 @@ class DataBlockScanner implements Runnab
totalTransientErrors++;
}
- updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);
+ // TODO:FEDERATION use Extended block
+ updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true);
return;
} catch (IOException e) {
-
- updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
+ // TODO:FEDERATION use Extended block
+ updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
- if ( dataset.getFile(block) == null ) {
+ if ( dataset.getFile(block.getLocalBlock()) == null ) {
LOG.info("Verification failed for " + block + ". Its ok since " +
"it not in datanode dataset anymore.");
- deleteBlock(block);
+ deleteBlock(block.getPoolId(), block.getLocalBlock());
return;
}
@@ -503,7 +505,8 @@ class DataBlockScanner implements Runnab
}
if ( block != null ) {
- verifyBlock(block);
+ // TODO:FEDERATION blockInfoSet should use ExtendedBlock
+ verifyBlock(new ExtendedBlock(block));
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Feb 22 20:31:14 2011
@@ -1008,7 +1008,7 @@ public class DataNode extends Configured
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
- transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ transferBlocks(bcmd.getPoolId(), bcmd.getBlocks(), bcmd.getTargets());
myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
@@ -1019,9 +1019,9 @@ public class DataNode extends Configured
Block toDelete[] = bcmd.getBlocks();
try {
if (blockScanner != null) {
- blockScanner.deleteBlocks(toDelete);
+ blockScanner.deleteBlocks(bcmd.getPoolId(), toDelete);
}
- data.invalidate(toDelete);
+ data.invalidate(bcmd.getPoolId(), toDelete);
} catch(IOException e) {
checkDiskError();
throw e;
@@ -1171,8 +1171,7 @@ public class DataNode extends Configured
private void transferBlock( ExtendedBlock block,
DatanodeInfo xferTargets[]
) throws IOException {
- // TODO:FEDERATION use ExtendedBlock
- if (!data.isValidBlock(block.getLocalBlock())) {
+ if (!data.isValidBlock(block)) {
// block does not exist or is under-construction
String errStr = "Can't send invalid block " + block;
LOG.info(errStr);
@@ -1183,8 +1182,7 @@ public class DataNode extends Configured
}
// Check if NN recorded length matches on-disk length
- // TODO:FEDERATION use ExtendedBlock
- long onDiskLength = data.getLength(block.getLocalBlock());
+ long onDiskLength = data.getLength(block);
if (block.getNumBytes() > onDiskLength) {
// Shorter on-disk len indicates corruption so report NN the corrupt block
namenode.reportBadBlocks(new LocatedBlock[]{
@@ -1212,13 +1210,11 @@ public class DataNode extends Configured
}
}
- private void transferBlocks( Block blocks[],
- DatanodeInfo xferTargets[][]
- ) {
+ private void transferBlocks(String poolId, Block blocks[],
+ DatanodeInfo xferTargets[][]) {
for (int i = 0; i < blocks.length; i++) {
try {
- // TODO:FEDERATION cleanup
- transferBlock(new ExtendedBlock(blocks[i]), xferTargets[i]);
+ transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -1230,13 +1226,15 @@ public class DataNode extends Configured
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
- protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
+ protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
if(block==null || delHint==null) {
throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
}
synchronized (receivedBlockList) {
synchronized (delHints) {
- receivedBlockList.add(block);
+ // TODO:FEDERATION receivedBlockList should be per block pool
+ // TODO:FEDERATION use ExtendedBlock
+ receivedBlockList.add(block.getLocalBlock());
delHints.add(delHint);
receivedBlockList.notifyAll();
}
@@ -1413,7 +1411,7 @@ public class DataNode extends Configured
* @param block
* @param delHint
*/
- void closeBlock(Block block, String delHint) {
+ void closeBlock(ExtendedBlock block, String delHint) {
myMetrics.blocksWritten.inc();
notifyNamenodeReceivedBlock(block, delHint);
if (blockScanner != null) {
@@ -1736,7 +1734,7 @@ public class DataNode extends Configured
public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newLength) throws IOException {
- ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock.getLocalBlock(),
+ ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
return new ExtendedBlock(oldBlock.getPoolId(), r);
}
@@ -1953,7 +1951,7 @@ public class DataNode extends Configured
}
}
- return data.getReplicaVisibleLength(block.getLocalBlock());
+ return data.getReplicaVisibleLength(block);
}
// Determine a Datanode's streaming address
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Feb 22 20:31:14 2011
@@ -274,7 +274,7 @@ class DataXceiver extends DataTransferPr
client, srcDataNode, datanode);
} else {
// TODO:FEDERATION use ExtendedBlock
- datanode.data.recoverClose(block.getLocalBlock(), newGs, minBytesRcvd);
+ datanode.data.recoverClose(block, newGs, minBytesRcvd);
}
//
@@ -378,7 +378,7 @@ class DataXceiver extends DataTransferPr
if (client.length() == 0 ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// TODO:FEDERATION use ExtendedBlock
- datanode.closeBlock(block.getLocalBlock(), DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
@@ -431,9 +431,8 @@ class DataXceiver extends DataTransferPr
}
}
- // TODO:FEDERATION use ExtendedBlock
final MetaDataInputStream metadataIn =
- datanode.data.getMetaDataInputStream(block.getLocalBlock());
+ datanode.data.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
metadataIn, BUFFER_SIZE));
@@ -620,7 +619,7 @@ class DataXceiver extends DataTransferPr
// notify name node
// TODO:FEDERATION use ExtendedBlock
- datanode.notifyNamenodeReceivedBlock(block.getLocalBlock(), sourceID);
+ datanode.notifyNamenodeReceivedBlock(block, sourceID);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Feb 22 20:31:14 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -767,7 +768,9 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized Block getStoredBlock(long blkid) throws IOException {
+ public synchronized Block getStoredBlock(String bpid, long blkid)
+ throws IOException {
+ // TODO:FEDERATION use extended block
File blockfile = findBlockFile(blkid);
if (blockfile == null) {
return null;
@@ -803,20 +806,21 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public boolean metaFileExists(Block b) throws IOException {
- return getMetaFile(b).exists();
+ public boolean metaFileExists(ExtendedBlock b) throws IOException {
+ // TODO:FEDERATION use ExtendedBlock
+ return getMetaFile(b.getLocalBlock()).exists();
}
@Override // FSDatasetInterface
- public long getMetaDataLength(Block b) throws IOException {
- File checksumFile = getMetaFile( b );
+ public long getMetaDataLength(ExtendedBlock b) throws IOException {
+ File checksumFile = getMetaFile(b.getLocalBlock());
return checksumFile.length();
}
@Override // FSDatasetInterface
- public MetaDataInputStream getMetaDataInputStream(Block b)
+ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
throws IOException {
- File checksumFile = getMetaFile( b );
+ File checksumFile = getMetaFile(b.getLocalBlock());
return new MetaDataInputStream(new FileInputStream(checksumFile),
checksumFile.length());
}
@@ -924,8 +928,8 @@ public class FSDataset implements FSCons
* Find the block's on-disk length
*/
@Override // FSDatasetInterface
- public long getLength(Block b) throws IOException {
- return getBlockFile(b).length();
+ public long getLength(ExtendedBlock b) throws IOException {
+ return getBlockFile(b.getLocalBlock()).length();
}
/**
@@ -943,14 +947,15 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized InputStream getBlockInputStream(Block b) throws IOException {
- return new FileInputStream(getBlockFile(b));
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+ throws IOException {
+ return new FileInputStream(getBlockFile(b.getLocalBlock()));
}
@Override // FSDatasetInterface
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
-
- File blockFile = getBlockFile(b);
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+ long seekOffset) throws IOException {
+ File blockFile = getBlockFile(b.getLocalBlock());
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (seekOffset > 0) {
blockInFile.seek(seekOffset);
@@ -977,10 +982,9 @@ public class FSDataset implements FSCons
* Returns handles to the block file and its metadata file
*/
@Override // FSDatasetInterface
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
+ public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long ckoff) throws IOException {
-
- ReplicaInfo info = getReplicaInfo(b);
+ ReplicaInfo info = getReplicaInfo(b.getLocalBlock());
File blockFile = info.getBlockFile();
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
@@ -1080,7 +1084,7 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface append(Block b,
+ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
@@ -1093,7 +1097,8 @@ public class FSDataset implements FSCons
throw new IOException("The new generation stamp " + newGS +
" should be greater than the replica " + b + "'s generation stamp");
}
- ReplicaInfo replicaInfo = volumeMap.get(b);
+ // TODO:FEDERATION use ExtendedBlock
+ ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
if (replicaInfo == null) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
@@ -1225,11 +1230,13 @@ public class FSDataset implements FSCons
return replicaInfo;
}
@Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
DataNode.LOG.info("Recover failed append to " + b);
- ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+ // TODO:FEDERATION use ExtendedBlock
+ ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS,
+ expectedBlockLen);
// change the replica's state/gs etc.
if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
@@ -1241,16 +1248,17 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public void recoverClose(Block b, long newGS,
+ public void recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
DataNode.LOG.info("Recover failed close " + b);
// check replica's state
- ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+ ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS,
+ expectedBlockLen);
// bump the replica's GS
bumpReplicaGS(replicaInfo, newGS);
// finalize the replica if RBW
if (replicaInfo.getState() == ReplicaState.RBW) {
- finalizeBlock(replicaInfo);
+ finalizeReplica(replicaInfo);
}
}
@@ -1282,7 +1290,7 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
if (replicaInfo != null) {
@@ -1293,7 +1301,7 @@ public class FSDataset implements FSCons
// create a new block
FSVolume v = volumes.getNextVolume(b.getNumBytes());
// create a rbw file to hold block in the designated volume
- File f = v.createRbwFile(b);
+ File f = v.createRbwFile(b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
volumeMap.add(newReplicaInfo);
@@ -1301,7 +1309,7 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
DataNode.LOG.info("Recover the RBW replica " + b);
@@ -1350,7 +1358,7 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
if (replicaInfo != null) {
@@ -1361,7 +1369,7 @@ public class FSDataset implements FSCons
FSVolume v = volumes.getNextVolume(b.getNumBytes());
// create a temporary file to hold block in the designated volume
- File f = v.createTmpFile(b);
+ File f = v.createTmpFile(b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
volumeMap.add(newReplicaInfo);
@@ -1374,7 +1382,7 @@ public class FSDataset implements FSCons
* last checksum will be overwritten.
*/
@Override // FSDatasetInterface
- public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams,
+ public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams,
int checksumSize) throws IOException {
FileOutputStream file = (FileOutputStream) streams.checksumOut;
FileChannel channel = file.getChannel();
@@ -1407,8 +1415,8 @@ public class FSDataset implements FSCons
* Complete the block write!
*/
@Override // FSDatasetInterface
- public synchronized void finalizeBlock(Block b) throws IOException {
- ReplicaInfo replicaInfo = getReplicaInfo(b);
+ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+ ReplicaInfo replicaInfo = getReplicaInfo(b.getLocalBlock());
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
@@ -1444,15 +1452,15 @@ public class FSDataset implements FSCons
* Remove the temporary block file (if any)
*/
@Override // FSDatasetInterface
- public synchronized void unfinalizeBlock(Block b) throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b);
+ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
// remove from volumeMap
- volumeMap.remove(b);
+ volumeMap.remove(b.getLocalBlock());
// delete the on-disk temp file
if (delBlockFromDisk(replicaInfo.getBlockFile(),
- replicaInfo.getMetaFile(), b)) {
+ replicaInfo.getMetaFile(), b.getLocalBlock())) {
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
}
}
@@ -1550,8 +1558,8 @@ public class FSDataset implements FSCons
* valid means finalized
*/
@Override // FSDatasetInterface
- public boolean isValidBlock(Block b) {
- ReplicaInfo replicaInfo = volumeMap.get(b);
+ public boolean isValidBlock(ExtendedBlock b) {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
if (replicaInfo == null ||
replicaInfo.getState() != ReplicaState.FINALIZED) {
return false;
@@ -1609,7 +1617,7 @@ public class FSDataset implements FSCons
* just get rid of it.
*/
@Override // FSDatasetInterface
- public void invalidate(Block invalidBlks[]) throws IOException {
+ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) {
File f = null;
@@ -1850,7 +1858,8 @@ public class FSDataset implements FSCons
// Remove the block from volumeMap
volumeMap.remove(blockId);
if (datanode.blockScanner != null) {
- datanode.blockScanner.deleteBlock(new Block(blockId));
+ // TODO:FEDERATION pass the right bpid
+ datanode.blockScanner.deleteBlock("TODO", new Block(blockId));
}
DataNode.LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
@@ -1872,7 +1881,7 @@ public class FSDataset implements FSCons
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(diskBlockInfo);
if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(diskBlockInfo);
+ datanode.blockScanner.addBlock(new ExtendedBlock(diskBlockInfo));
}
DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
return;
@@ -2041,7 +2050,7 @@ public class FSDataset implements FSCons
@Override // FSDatasetInterface
public synchronized ReplicaInfo updateReplicaUnderRecovery(
- final Block oldBlock,
+ final ExtendedBlock oldBlock,
final long recoveryId,
final long newlength) throws IOException {
//get replica
@@ -2112,7 +2121,7 @@ public class FSDataset implements FSCons
}
@Override // FSDatasetInterface
- public synchronized long getReplicaVisibleLength(final Block block)
+ public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
final Replica replica = volumeMap.get(block.getBlockId());
if (replica == null) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Feb 22 20:31:14 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -50,7 +51,7 @@ public interface FSDatasetInterface exte
* @return the length of the metadata file for the specified block.
* @throws IOException
*/
- public long getMetaDataLength(Block b) throws IOException;
+ public long getMetaDataLength(ExtendedBlock b) throws IOException;
/**
* This class provides the input stream and length of the metadata
@@ -75,7 +76,7 @@ public interface FSDatasetInterface exte
* @return the metadata input stream;
* @throws IOException
*/
- public MetaDataInputStream getMetaDataInputStream(Block b)
+ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
throws IOException;
/**
@@ -84,7 +85,7 @@ public interface FSDatasetInterface exte
* @return true of the metafile for specified block exits
* @throws IOException
*/
- public boolean metaFileExists(Block b) throws IOException;
+ public boolean metaFileExists(ExtendedBlock b) throws IOException;
/**
@@ -93,7 +94,7 @@ public interface FSDatasetInterface exte
* @return the specified block's on-disk length (excluding metadta)
* @throws IOException
*/
- public long getLength(Block b) throws IOException;
+ public long getLength(ExtendedBlock b) throws IOException;
/**
* Get reference to the replica meta info in the replicasMap.
@@ -107,7 +108,8 @@ public interface FSDatasetInterface exte
/**
* @return the generation stamp stored with the block.
*/
- public Block getStoredBlock(long blkid) throws IOException;
+ public Block getStoredBlock(String poolId, long blkid)
+ throws IOException;
/**
* Returns an input stream to read the contents of the specified block
@@ -115,7 +117,7 @@ public interface FSDatasetInterface exte
* @return an input stream to read the contents of the specified block
* @throws IOException
*/
- public InputStream getBlockInputStream(Block b) throws IOException;
+ public InputStream getBlockInputStream(ExtendedBlock b) throws IOException;
/**
* Returns an input stream at specified offset of the specified block
@@ -125,7 +127,7 @@ public interface FSDatasetInterface exte
* starting at the offset
* @throws IOException
*/
- public InputStream getBlockInputStream(Block b, long seekOffset)
+ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset)
throws IOException;
/**
@@ -138,8 +140,8 @@ public interface FSDatasetInterface exte
* starting at the offset
* @throws IOException
*/
- public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff)
- throws IOException;
+ public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
+ long ckoff) throws IOException;
/**
*
@@ -188,7 +190,7 @@ public interface FSDatasetInterface exte
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createTemporary(Block b)
+ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException;
/**
@@ -198,7 +200,7 @@ public interface FSDatasetInterface exte
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+ public ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
@@ -210,7 +212,7 @@ public interface FSDatasetInterface exte
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface recoverRbw(Block b,
+ public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException;
@@ -223,7 +225,7 @@ public interface FSDatasetInterface exte
* @return the meata info of the replica which is being written to
* @throws IOException
*/
- public ReplicaInPipelineInterface append(Block b,
+ public ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException;
/**
@@ -236,7 +238,7 @@ public interface FSDatasetInterface exte
* @return the meta info of the replica which is being written to
* @throws IOException
*/
- public ReplicaInPipelineInterface recoverAppend(Block b,
+ public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException;
/**
@@ -248,7 +250,7 @@ public interface FSDatasetInterface exte
* @param expectedBlockLen the number of bytes the replica is expected to have
* @throws IOException
*/
- public void recoverClose(Block b,
+ public void recoverClose(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException;
/**
@@ -258,7 +260,7 @@ public interface FSDatasetInterface exte
* @param b
* @throws IOException
*/
- public void finalizeBlock(Block b) throws IOException;
+ public void finalizeBlock(ExtendedBlock b) throws IOException;
/**
* Unfinalizes the block previously opened for writing using writeToBlock.
@@ -266,7 +268,7 @@ public interface FSDatasetInterface exte
* @param b
* @throws IOException
*/
- public void unfinalizeBlock(Block b) throws IOException;
+ public void unfinalizeBlock(ExtendedBlock b) throws IOException;
/**
* Returns the block report - the full list of blocks stored
@@ -279,14 +281,14 @@ public interface FSDatasetInterface exte
* @param b
* @return - true if the specified block is valid
*/
- public boolean isValidBlock(Block b);
+ public boolean isValidBlock(ExtendedBlock b);
/**
* Invalidates the specified blocks
* @param invalidBlks - the blocks to be invalidated
* @throws IOException
*/
- public void invalidate(Block invalidBlks[]) throws IOException;
+ public void invalidate(String poolId, Block invalidBlks[]) throws IOException;
/**
* Check if all the data directories are healthy
@@ -312,7 +314,7 @@ public interface FSDatasetInterface exte
* @param checksumSize number of bytes each checksum has
* @throws IOException
*/
- public void adjustCrcChannelPosition(Block b, BlockWriteStreams stream,
+ public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream,
int checksumSize) throws IOException;
/**
@@ -324,7 +326,7 @@ public interface FSDatasetInterface exte
/**
* Get visible length of the specified replica.
*/
- long getReplicaVisibleLength(final Block block) throws IOException;
+ long getReplicaVisibleLength(final ExtendedBlock block) throws IOException;
/**
* Initialize a replica recovery.
@@ -339,7 +341,7 @@ public interface FSDatasetInterface exte
* Update replica's generation stamp and length and finalize it.
*/
public ReplicaInfo updateReplicaUnderRecovery(
- Block oldBlock,
+ ExtendedBlock oldBlock,
long recoveryId,
long newLength) throws IOException;
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Tue Feb 22 20:31:14 2011
@@ -350,10 +350,8 @@ public class DatanodeDescriptor extends
}
}
- BlockCommand getReplicationCommand(int maxTransfers) {
- List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
- return blocktargetlist == null? null:
- new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
+ List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
+ return replicateBlocks.poll(maxTransfers);
}
BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
@@ -371,10 +369,8 @@ public class DatanodeDescriptor extends
/**
* Remove the specified number of blocks to be invalidated
*/
- BlockCommand getInvalidateBlocks(int maxblocks) {
- Block[] deleteList = getBlockArray(invalidateBlocks, maxblocks);
- return deleteList == null?
- null: new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList);
+ Block[] getInvalidateBlocks(int maxblocks) {
+ return getBlockArray(invalidateBlocks, maxblocks);
}
static private Block[] getBlockArray(Collection<Block> blocks, int max) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Feb 22 20:31:14 2011
@@ -52,10 +52,13 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -187,10 +190,10 @@ public class FSNamesystem implements FSC
// Stores the correct file name hierarchy
//
public FSDirectory dir;
-
- // TODO:FEDERATION initialize from the persisted information
- String poolId = "TODO";
BlockManager blockManager;
+
+ // Block pool ID used by this namenode
+ String blockPoolId;
/**
* Stores the datanode -> block map.
@@ -1525,11 +1528,11 @@ public class FSNamesystem implements FSC
}
ExtendedBlock getExtendedBlock(Block blk) {
- return new ExtendedBlock(poolId, blk);
+ return new ExtendedBlock(blockPoolId, blk);
}
-
+
void setBlockPoolId(String bpid) {
- poolId = bpid;
+ blockPoolId = bpid;
}
/**
@@ -1773,7 +1776,7 @@ public class FSNamesystem implements FSC
b.setGenerationStamp(getGenerationStamp());
b = dir.addBlock(src, inodes, b, targets);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
- +src+ ". "+b);
+ +src+ ". " + blockPoolId + " "+ b);
return b;
}
@@ -2734,14 +2737,17 @@ public class FSNamesystem implements FSC
ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
//check pending replication
- cmd = nodeinfo.getReplicationCommand(
+ List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
blockManager.maxReplicationStreams - xmitsInProgress);
- if (cmd != null) {
+ if (pendingList != null) {
+ cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+ pendingList);
cmds.add(cmd);
}
//check block invalidation
- cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
- if (cmd != null) {
+ Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ if (blks != null) {
+ cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks);
cmds.add(cmd);
}
// check access key update
@@ -3210,17 +3216,10 @@ public class FSNamesystem implements FSC
}
}
- private void checkPoolId(String thatPoolId) throws IOException {
- if (!this.poolId.equals(thatPoolId)) {
- throw new IOException("PoolId " + thatPoolId
- + " does not belong to expected pool " + poolId);
- }
- }
-
private void checkBlock(ExtendedBlock block) throws IOException {
- if (block != null && !this.poolId.equals(block.getPoolId())) {
- throw new IOException("Block " + block
- + " does not belong to expected pool " + poolId);
+ if (block != null && !this.blockPoolId.equals(block.getPoolId())) {
+ throw new IOException("Unexpected BlockPoolId " + block.getPoolId()
+ + " - expected " + blockPoolId);
}
}
@@ -5234,6 +5233,6 @@ public class FSNamesystem implements FSC
}
public String getPoolId() {
- return poolId;
+ return blockPoolId;
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Feb 22 20:31:14 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.*;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockCommand extends DatanodeCommand {
+ String poolId;
Block blocks[];
DatanodeInfo targets[][];
@@ -48,9 +49,11 @@ public class BlockCommand extends Datano
* Create BlockCommand for transferring blocks to another datanode
* @param blocktargetlist blocks to be transferred
*/
- public BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
+ public BlockCommand(int action, String poolId,
+ List<BlockTargetPair> blocktargetlist) {
super(action);
+ this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
for(int i = 0; i < blocks.length; i++) {
@@ -66,12 +69,17 @@ public class BlockCommand extends Datano
* Create BlockCommand for the given action
* @param blocks blocks related to the action
*/
- public BlockCommand(int action, Block blocks[]) {
+ public BlockCommand(int action, String poolId, Block blocks[]) {
super(action);
+ this.poolId = poolId;
this.blocks = blocks;
this.targets = EMPTY_TARGET;
}
+ public String getPoolId() {
+ return poolId;
+ }
+
public Block[] getBlocks() {
return blocks;
}
@@ -93,6 +101,7 @@ public class BlockCommand extends Datano
public void write(DataOutput out) throws IOException {
super.write(out);
+ Text.writeString(out, poolId);
out.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
blocks[i].write(out);
@@ -108,6 +117,7 @@ public class BlockCommand extends Datano
public void readFields(DataInput in) throws IOException {
super.readFields(in);
+ this.poolId = Text.readString(in);
this.blocks = new Block[in.readInt()];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = new Block();
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Feb 22 20:31:14 2011
@@ -1134,8 +1134,9 @@ public class MiniDFSCluster {
if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
}
+ String bpid = getNamesystem().getPoolId();
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
- sdataset.injectBlocks(blocksToInject);
+ sdataset.injectBlocks(bpid, blocksToInject);
dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
}
@@ -1148,7 +1149,8 @@ public class MiniDFSCluster {
* if any of blocks already exist in the data nodes
* Note the rest of the blocks are not injected.
*/
- public void injectBlocks(Iterable<Block>[] blocksToInject) throws IOException {
+ public void injectBlocks(Iterable<Block>[] blocksToInject)
+ throws IOException {
if (blocksToInject.length > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Tue Feb 22 20:31:14 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.io.IOUtils;
@@ -1108,10 +1109,13 @@ public class TestDFSShell extends TestCa
List<File> files = new ArrayList<File>();
List<DataNode> datanodes = cluster.getDataNodes();
Iterable<Block>[] blocks = cluster.getAllBlockReports();
+ ExtendedBlock blk = new ExtendedBlock();
+ String poolId = cluster.getNamesystem().getPoolId();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
- files.add(ds.getBlockFile(b));
+ blk.set(poolId, b);
+ files.add(ds.getBlockFile(blk.getLocalBlock()));
}
}
return files;
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Feb 22 20:31:14 2011
@@ -268,7 +268,8 @@ public class TestFileAppend3 extends jun
}
for(DatanodeInfo datanodeinfo : lb.getLocations()) {
final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
- final Block metainfo = dn.data.getStoredBlock(blk.getBlockId());
+ final Block metainfo = dn.data.getStoredBlock(blk.getPoolId(),
+ blk.getBlockId());
assertEquals(size, metainfo.getNumBytes());
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Feb 22 20:31:14 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -765,7 +766,8 @@ public class TestFileCreation extends ju
for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) {
DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort);
FSDataset dataset = (FSDataset)datanode.data;
- Block b = dataset.getStoredBlock(locatedblock.getBlock().getBlockId());
+ ExtendedBlock blk = locatedblock.getBlock();
+ Block b = dataset.getStoredBlock(blk.getPoolId(), blk.getBlockId());
File blockfile = dataset.findBlockFile(b.getBlockId());
System.out.println("blockfile=" + blockfile);
if (blockfile != null) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Tue Feb 22 20:31:14 2011
@@ -116,8 +116,8 @@ public class TestLeaseRecovery extends j
dfs.dfs.getNamenode(), filestr).getBlock();
long currentGS = lastblock.getGenerationStamp();
for(int i = 0; i < REPLICATION_NUM; i++) {
- updatedmetainfo[i] =
- datanodes[i].data.getStoredBlock(lastblock.getBlockId());
+ updatedmetainfo[i] = datanodes[i].data.getStoredBlock(lastblock
+ .getPoolId(), lastblock.getBlockId());
assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
assertEquals(oldSize, updatedmetainfo[i].getNumBytes());
assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Feb 22 20:31:14 2011
@@ -323,8 +323,9 @@ public class SimulatedFSDataset impleme
blockMap = new HashMap<Block,BInfo>();
}
- public synchronized void injectBlocks(Iterable<Block> injectBlocks)
- throws IOException {
+ public synchronized void injectBlocks(String poolId,
+ Iterable<Block> injectBlocks) throws IOException {
+ ExtendedBlock blk = new ExtendedBlock();
if (injectBlocks != null) {
int numInjectedBlocks = 0;
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@@ -332,7 +333,8 @@ public class SimulatedFSDataset impleme
if (b == null) {
throw new NullPointerException("Null blocks in block list");
}
- if (isValidBlock(b)) {
+ blk.set(poolId, b);
+ if (isValidBlock(blk)) {
throw new IOException("Block already exists in block list");
}
}
@@ -347,9 +349,10 @@ public class SimulatedFSDataset impleme
}
}
- @Override
- public synchronized void finalizeBlock(Block b) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@@ -357,10 +360,10 @@ public class SimulatedFSDataset impleme
}
- @Override
- public synchronized void unfinalizeBlock(Block b) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
if (isBeingWritten(b)) {
- blockMap.remove(b);
+ blockMap.remove(b.getLocalBlock());
}
}
@@ -392,9 +395,9 @@ public class SimulatedFSDataset impleme
return storage.getFree();
}
- @Override
- public synchronized long getLength(Block b) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized long getLength(ExtendedBlock b) throws IOException {
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@@ -407,20 +410,22 @@ public class SimulatedFSDataset impleme
return blockMap.get(new Block(blockId));
}
- @Override
- public Block getStoredBlock(long blkid) throws IOException {
- Block b = new Block(blkid);
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public Block getStoredBlock(String poolId, long blkid) throws IOException {
+ ExtendedBlock b = new ExtendedBlock(poolId, blkid);
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
return null;
}
b.setGenerationStamp(binfo.getGenerationStamp());
b.setNumBytes(binfo.getNumBytes());
- return b;
+ return b.getLocalBlock();
}
- @Override
- public synchronized void invalidate(Block[] invalidBlks) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized void invalidate(String poolId, Block[] invalidBlks)
+ throws IOException {
boolean error = false;
if (invalidBlks == null) {
return;
@@ -443,10 +448,11 @@ public class SimulatedFSDataset impleme
}
}
- @Override
- public synchronized boolean isValidBlock(Block b) {
+ @Override // FSDatasetInterface
+ public synchronized boolean isValidBlock(ExtendedBlock b) {
// return (blockMap.containsKey(b));
- BInfo binfo = blockMap.get(b);
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
return false;
}
@@ -454,8 +460,9 @@ public class SimulatedFSDataset impleme
}
/* check if a block is created but not finalized */
- private synchronized boolean isBeingWritten(Block b) {
- BInfo binfo = blockMap.get(b);
+ private synchronized boolean isBeingWritten(ExtendedBlock b) {
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
return false;
}
@@ -466,10 +473,10 @@ public class SimulatedFSDataset impleme
return getStorageInfo();
}
- @Override
- public synchronized ReplicaInPipelineInterface append(Block b,
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null || !binfo.isFinalized()) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@@ -478,10 +485,10 @@ public class SimulatedFSDataset impleme
return binfo;
}
- @Override
- public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@@ -495,10 +502,10 @@ public class SimulatedFSDataset impleme
return binfo;
}
- @Override
- public void recoverClose(Block b, long newGS,
- long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+ throws IOException {
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@@ -506,15 +513,15 @@ public class SimulatedFSDataset impleme
if (!binfo.isFinalized()) {
binfo.finalizeBlock(binfo.getNumBytes());
}
- blockMap.remove(b);
+ blockMap.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS);
blockMap.put(binfo.theBlock, binfo);
}
- @Override
- public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
- BInfo binfo = blockMap.get(b);
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if ( binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " does not exist, and cannot be appended to.");
@@ -529,14 +536,14 @@ public class SimulatedFSDataset impleme
return binfo;
}
- @Override
- public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
throws IOException {
return createTemporary(b);
}
- @Override
- public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -546,15 +553,17 @@ public class SimulatedFSDataset impleme
throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
- BInfo binfo = new BInfo(b, true);
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = new BInfo(b.getLocalBlock(), true);
blockMap.put(binfo.theBlock, binfo);
return binfo;
}
- @Override
- public synchronized InputStream getBlockInputStream(Block b)
- throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b)
+ throws IOException {
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -563,18 +572,18 @@ public class SimulatedFSDataset impleme
return binfo.getIStream();
}
- @Override
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
- throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+ long seekOffset) throws IOException {
InputStream result = getBlockInputStream(b);
result.skip(seekOffset);
return result;
}
/** Not supported */
- @Override
- public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
- ) throws IOException {
+ @Override // FSDatasetInterface
+ public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
+ long ckoff) throws IOException {
throw new IOException("Not supported");
}
@@ -585,9 +594,10 @@ public class SimulatedFSDataset impleme
* @throws IOException - block does not exist or problems accessing
* the meta file
*/
- private synchronized InputStream getMetaDataInStream(Block b)
+ private synchronized InputStream getMetaDataInStream(ExtendedBlock b)
throws IOException {
- BInfo binfo = blockMap.get(b);
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -598,9 +608,11 @@ public class SimulatedFSDataset impleme
return binfo.getMetaIStream();
}
- @Override
- public synchronized long getMetaDataLength(Block b) throws IOException {
- BInfo binfo = blockMap.get(b);
+ @Override // FSDatasetInterface
+ public synchronized long getMetaDataLength(ExtendedBlock b)
+ throws IOException {
+ // TODO:FEDERATION use ExtendedBlock
+ BInfo binfo = blockMap.get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@@ -611,16 +623,15 @@ public class SimulatedFSDataset impleme
return binfo.getMetaIStream().getLength();
}
- @Override
- public MetaDataInputStream getMetaDataInputStream(Block b)
- throws IOException {
-
- return new MetaDataInputStream(getMetaDataInStream(b),
- getMetaDataLength(b));
+ @Override // FSDatasetInterface
+ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
+ throws IOException {
+ return new MetaDataInputStream(getMetaDataInStream(b),
+ getMetaDataLength(b));
}
- @Override
- public synchronized boolean metaFileExists(Block b) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized boolean metaFileExists(ExtendedBlock b) throws IOException {
if (!isValidBlock(b)) {
throw new IOException("Block " + b +
" is valid, and cannot be written to.");
@@ -632,8 +643,8 @@ public class SimulatedFSDataset impleme
// nothing to check for simulated data set
}
- @Override
- public synchronized void adjustCrcChannelPosition(Block b,
+ @Override // FSDatasetInterface
+ public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
BlockWriteStreams stream,
int checksumSize)
throws IOException {
@@ -818,15 +829,15 @@ public class SimulatedFSDataset impleme
}
@Override // FSDatasetInterface
- public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+ public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newlength) throws IOException {
return new FinalizedReplica(
oldBlock.getBlockId(), newlength, recoveryId, null, null);
}
- @Override
- public long getReplicaVisibleLength(Block block) throws IOException {
+ @Override // FSDatasetInterface
+ public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
return block.getNumBytes();
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Tue Feb 22 20:31:14 2011
@@ -68,6 +68,7 @@ public class TestBlockReport {
static final int BLOCK_SIZE = 1024;
static final int NUM_BLOCKS = 10;
static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
+ static String bpid;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
@@ -86,6 +87,7 @@ public class TestBlockReport {
REPL_FACTOR = 1; //Reset if case a test has modified the value
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
fs = (DistributedFileSystem) cluster.getFileSystem();
+ bpid = cluster.getNamesystem().getPoolId();
}
@After
@@ -173,7 +175,7 @@ public class TestBlockReport {
File dataDir = new File(cluster.getDataDirectory());
assertTrue(dataDir.isDirectory());
- List<Block> blocks2Remove = new ArrayList<Block>();
+ List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
List<Integer> removedIndex = new ArrayList<Integer>();
List<LocatedBlock> lBlocks = cluster.getNameNode().getBlockLocations(
filePath.toString(), FILE_START,
@@ -186,7 +188,7 @@ public class TestBlockReport {
}
for (Integer aRemovedIndex : removedIndex) {
- blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock().getLocalBlock());
+ blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
}
ArrayList<Block> blocks = locatedToBlocks(lBlocks, removedIndex);
@@ -194,7 +196,7 @@ public class TestBlockReport {
LOG.debug("Number of blocks allocated " + lBlocks.size());
}
- for (Block b : blocks2Remove) {
+ for (ExtendedBlock b : blocks2Remove) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing the block " + b.getBlockName());
}
@@ -694,7 +696,8 @@ public class TestBlockReport {
// Get block from the first DN
ret = cluster.getDataNodes().get(DN_N0).
- data.getStoredBlock(lb.getBlock().getBlockId());
+ data.getStoredBlock(lb.getBlock()
+ .getPoolId(), lb.getBlock().getBlockId());
return ret;
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java Tue Feb 22 20:31:14 2011
@@ -121,7 +121,8 @@ public class TestDatanodeRestart {
} else {
Assert.assertEquals(fileLen, replica.getNumBytes());
}
- dn.data.invalidate(new Block[]{replica});
+ String bpid = cluster.getNamesystem().getPoolId();
+ dn.data.invalidate(bpid, new Block[]{replica});
} finally {
IOUtils.closeStream(out);
if (fs.exists(src)) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Tue Feb 22 20:31:14 2011
@@ -48,7 +48,7 @@ import org.junit.Test;
*/
public class TestInterDatanodeProtocol {
public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
- Block metainfo = dn.data.getStoredBlock(b.getBlockId());
+ Block metainfo = dn.data.getStoredBlock(b.getPoolId(), b.getBlockId());
Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes());
}
@@ -209,7 +209,10 @@ public class TestInterDatanodeProtocol {
}
}
- /** Test {@link FSDataset#updateReplicaUnderRecovery(Block, long, long)} */
+ /**
+ * Test for
+ * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)}
+ * */
@Test
public void testUpdateReplicaUnderRecovery() throws IOException {
final Configuration conf = new HdfsConfiguration();
@@ -255,8 +258,8 @@ public class TestInterDatanodeProtocol {
//with (block length) != (stored replica's on disk length).
{
//create a block with same id and gs but different length.
- final Block tmp = new Block(rri.getBlockId(), rri.getNumBytes() - 1,
- rri.getGenerationStamp());
+ final ExtendedBlock tmp = new ExtendedBlock(b.getPoolId(), rri
+ .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp());
try {
//update should fail
fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength);
@@ -268,7 +271,7 @@ public class TestInterDatanodeProtocol {
//update
final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery(
- rri, recoveryid, newlength);
+ new ExtendedBlock(b.getPoolId(), rri), recoveryid, newlength);
//check meta data after update
FSDataset.checkReplicaFiles(finalized);
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1073489&r1=1073488&r2=1073489&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Tue Feb 22 20:31:14 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
@@ -38,11 +39,10 @@ import org.apache.hadoop.util.DataChecks
*/
public class TestSimulatedFSDataset extends TestCase {
-
Configuration conf = null;
-
-
+ // TODO:FEDERATION initialize this
+ static String bpid;
static final int NUMBLOCKS = 20;
static final int BLOCK_LENGTH_MULTIPLIER = 79;
@@ -61,10 +61,13 @@ public class TestSimulatedFSDataset exte
return blkid*BLOCK_LENGTH_MULTIPLIER;
}
- int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) throws IOException {
+ int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId)
+ throws IOException {
int bytesAdded = 0;
for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
- Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
+ ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
+ // we pass expected len as zero, - fsdataset should use the sizeof actual
+ // data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
BlockWriteStreams out = bInfo.createStreams(true, 512, 4);
try {
@@ -90,7 +93,7 @@ public class TestSimulatedFSDataset exte
public void testGetMetaData() throws IOException {
FSDatasetInterface fsdataset = new SimulatedFSDataset(conf);
- Block b = new Block(1, 5, 0);
+ ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
try {
assertFalse(fsdataset.metaFileExists(b));
assertTrue("Expected an IO exception", false);
@@ -98,7 +101,7 @@ public class TestSimulatedFSDataset exte
// ok - as expected
}
addSomeBlocks(fsdataset); // Only need to add one but ....
- b = new Block(1, 0, 0);
+ b = new ExtendedBlock(bpid, 1, 0, 0);
InputStream metaInput = fsdataset.getMetaDataInputStream(b);
DataInputStream metaDataInput = new DataInputStream(metaInput);
short version = metaDataInput.readShort();
@@ -122,7 +125,7 @@ public class TestSimulatedFSDataset exte
void checkBlockDataAndSize(FSDatasetInterface fsdataset,
- Block b, long expectedLen) throws IOException {
+ ExtendedBlock b, long expectedLen) throws IOException {
InputStream input = fsdataset.getBlockInputStream(b);
long lengthRead = 0;
int data;
@@ -137,7 +140,7 @@ public class TestSimulatedFSDataset exte
FSDatasetInterface fsdataset = new SimulatedFSDataset(conf);
addSomeBlocks(fsdataset);
for (int i=1; i <= NUMBLOCKS; ++i) {
- Block b = new Block(i, 0, 0);
+ ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
assertTrue(fsdataset.isValidBlock(b));
assertEquals(blockIdToLen(i), fsdataset.getLength(b));
checkBlockDataAndSize(fsdataset, b, blockIdToLen(i));
@@ -175,13 +178,14 @@ public class TestSimulatedFSDataset exte
SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf);
- sfsdataset.injectBlocks(blockReport);
+ sfsdataset.injectBlocks(bpid, blockReport);
blockReport = sfsdataset.getBlockReport();
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- assertEquals(blockIdToLen(b.getBlockId()), sfsdataset.getLength(b));
+ assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
+ .getLength(new ExtendedBlock(bpid, b)));
}
assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
@@ -213,13 +217,14 @@ public class TestSimulatedFSDataset exte
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
blockReport2 = sfsdataset.getBlockReport();
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
- sfsdataset.injectBlocks(blockReport);
+ sfsdataset.injectBlocks(bpid, blockReport);
blockReport = sfsdataset.getBlockReport();
assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
- assertEquals(blockIdToLen(b.getBlockId()), sfsdataset.getLength(b));
+ assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
+ .getLength(new ExtendedBlock(bpid, b)));
}
assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
@@ -231,7 +236,7 @@ public class TestSimulatedFSDataset exte
try {
sfsdataset = new SimulatedFSDataset(conf);
- sfsdataset.injectBlocks(blockReport);
+ sfsdataset.injectBlocks(bpid, blockReport);
assertTrue("Expected an IO exception", false);
} catch (IOException e) {
// ok - as expected
@@ -239,7 +244,7 @@ public class TestSimulatedFSDataset exte
}
- public void checkInvalidBlock(Block b) throws IOException {
+ public void checkInvalidBlock(ExtendedBlock b) throws IOException {
FSDatasetInterface fsdataset = new SimulatedFSDataset(conf);
assertFalse(fsdataset.isValidBlock(b));
try {
@@ -267,12 +272,12 @@ public class TestSimulatedFSDataset exte
public void testInValidBlocks() throws IOException {
FSDatasetInterface fsdataset = new SimulatedFSDataset(conf);
- Block b = new Block(1, 5, 0);
+ ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
checkInvalidBlock(b);
// Now check invlaid after adding some blocks
addSomeBlocks(fsdataset);
- b = new Block(NUMBLOCKS + 99, 5, 0);
+ b = new ExtendedBlock(bpid, NUMBLOCKS + 99, 5, 0);
checkInvalidBlock(b);
}
@@ -283,9 +288,9 @@ public class TestSimulatedFSDataset exte
Block[] deleteBlocks = new Block[2];
deleteBlocks[0] = new Block(1, 0, 0);
deleteBlocks[1] = new Block(2, 0, 0);
- fsdataset.invalidate(deleteBlocks);
- checkInvalidBlock(deleteBlocks[0]);
- checkInvalidBlock(deleteBlocks[1]);
+ fsdataset.invalidate(bpid, deleteBlocks);
+ checkInvalidBlock(new ExtendedBlock(bpid, deleteBlocks[0]));
+ checkInvalidBlock(new ExtendedBlock(deleteBlocks[1]));
long sizeDeleted = blockIdToLen(1) + blockIdToLen(2);
assertEquals(bytesAdded-sizeDeleted, fsdataset.getDfsUsed());
assertEquals(fsdataset.getCapacity()-bytesAdded+sizeDeleted, fsdataset.getRemaining());
@@ -295,7 +300,7 @@ public class TestSimulatedFSDataset exte
// Now make sure the rest of the blocks are valid
for (int i=3; i <= NUMBLOCKS; ++i) {
Block b = new Block(i, 0, 0);
- assertTrue(fsdataset.isValidBlock(b));
+ assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
}
}