You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/29 08:59:49 UTC
svn commit: r819843 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Author: hairong
Date: Tue Sep 29 06:59:47 2009
New Revision: 819843
URL: http://svn.apache.org/viewvc?rev=819843&view=rev
Log:
HDFS-642. Support pipeline close and close error recovery. Contributed by Hairong Kuang.
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Tue Sep 29 06:59:47 2009
@@ -50,6 +50,8 @@
HDFS-627. Support replica update in data-node.
(Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
+ HDFS-642. Support pipeline close and close error recovery. (hairong)
+
IMPROVEMENTS
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 06:59:47 2009
@@ -1431,8 +1431,8 @@
int dataLen = in.readInt();
// Sanity check the lengths
- if ( dataLen < 0 ||
- ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+ if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
+ ( dataLen != 0 && lastPacketInBlock) ||
(seqno != (lastSeqNo + 1)) ) {
throw new IOException("BlockReader: error in packet header" +
"(chunkOffset : " + chunkOffset +
@@ -2598,7 +2598,16 @@
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}
-
+
+ private void endBlock() {
+ LOG.debug("Closing old block " + block);
+ this.setName("DataStreamer for file " + src);
+ closeResponder();
+ closeStream();
+ nodes = null;
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
@@ -2642,8 +2651,6 @@
one = dataQueue.getFirst();
}
- long offsetInBlock = one.offsetInBlock;
-
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
@@ -2655,14 +2662,34 @@
initDataStreaming();
}
- if (offsetInBlock >= blockSize) {
+ long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+ if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
- offsetInBlock +
+ lastByteOffsetInBlock +
" Aborting file " + src);
}
+ if (one.lastPacketInBlock) {
+ // wait for all data packets have been successfully acked
+ synchronized (dataQueue) {
+ while (!streamerClosed && !hasError &&
+ ackQueue.size() != 0 && clientRunning) {
+ try {
+ // wait for acks to arrive from datanodes
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+ stage = BlockConstructionStage.PIPELINE_CLOSE;
+ }
+
+ // send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) {
@@ -2674,11 +2701,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block +
- " sending packet seqno:" + one.seqno +
- " size:" + buf.remaining() +
- " offsetInBlock:" + one.offsetInBlock +
- " lastPacketInBlock:" + one.lastPacketInBlock +
- " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
+ " sending packet " + one);
}
// write out data to remote datanode
@@ -2690,22 +2713,31 @@
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
-
+
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+
+ // Is this block full?
if (one.lastPacketInBlock) {
+ // wait for the close packet has been acked
synchronized (dataQueue) {
- while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
- try {
- dataQueue.wait(1000); // wait for acks to arrive from datanodes
- } catch (InterruptedException e) {
- }
+ while (!streamerClosed && !hasError &&
+ ackQueue.size() != 0 && clientRunning) {
+ dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
-
- if (ackQueue.isEmpty()) { // done receiving all acks
- // indicate end-of-block
- blockStream.writeInt(0);
- blockStream.flush();
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
}
+
+ endBlock();
+ }
+ if (progress != null) { progress.progress(); }
+
+ // This is used by unit test to trigger race conditions.
+ if (artificialSlowdown != 0 && clientRunning) {
+ Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
@@ -2718,29 +2750,6 @@
streamerClosed = true;
}
}
-
-
- if (streamerClosed || hasError || !clientRunning) {
- continue;
- }
-
- // Is this block full?
- if (one.lastPacketInBlock) {
- LOG.debug("Closing old block " + block);
- this.setName("DataStreamer for file " + src);
- closeResponder();
- closeStream();
- nodes = null;
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && clientRunning) {
- try {
- Thread.sleep(artificialSlowdown);
- } catch (InterruptedException e) {}
- }
}
closeInternal();
}
@@ -2928,7 +2937,15 @@
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && clientRunning) {
- initDataStreaming();
+ if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+ synchronized (dataQueue) {
+ dataQueue.remove(); // remove the end of block packet
+ dataQueue.notifyAll();
+ }
+ endBlock();
+ } else {
+ initDataStreaming();
+ }
}
return doSleep;
@@ -3392,15 +3409,6 @@
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
- //
- // if we allocated a new packet because we encountered a block
- // boundary, reset bytesCurBlock.
- //
- if (bytesCurBlock == blockSize) {
- currentPacket.lastPacketInBlock = true;
- bytesCurBlock = 0;
- lastFlushOffset = -1;
- }
waitAndQueuePacket(currentPacket);
currentPacket = null;
@@ -3413,6 +3421,20 @@
}
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
+
+ //
+ // if encountering a block boundary, send an empty packet to
+ // indicate the end of block and reset bytesCurBlock.
+ //
+ if (bytesCurBlock == blockSize) {
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ bytesCurBlock);
+ currentPacket.lastPacketInBlock = true;
+ waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ bytesCurBlock = 0;
+ lastFlushOffset = -1;
+ }
}
}
@@ -3556,21 +3578,22 @@
try {
flushBuffer(); // flush from all upper layers
- // Mark that this packet is the last packet in block.
- // If there are no outstanding packets and the last packet
- // was not the last one in the current block, then create a
- // packet with empty payload.
- if (currentPacket == null && bytesCurBlock != 0) {
- currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
- }
if (currentPacket != null) {
+ waitAndQueuePacket(currentPacket);
+ }
+
+ if (bytesCurBlock != 0) {
+ // send an empty packet to mark the end of the block
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}
flushInternal(); // flush all data to Datanodes
+ LOG.info("Done flushing");
// get last block before destroying the streamer
Block lastBlock = streamer.getBlock();
+ LOG.info("Closing the streams...");
closeThreads(false);
completeFile(lastBlock);
leasechecker.remove(src);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 29 06:59:47 2009
@@ -102,14 +102,8 @@
replicaInfo = datanode.data.createRbw(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
- if (datanode.data.isValidBlock(block)) {
- // pipeline failed after the replica is finalized. This will be
- // handled differently when pipeline close/recovery is introduced
- replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd);
- } else {
- replicaInfo = datanode.data.recoverRbw(
- block, newGs, minBytesRcvd, maxBytesRcvd);
- }
+ replicaInfo = datanode.data.recoverRbw(
+ block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
@@ -330,7 +324,7 @@
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
- private int readNextPacket() throws IOException {
+ private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* for next packet at the same time.
@@ -366,12 +360,6 @@
int payloadLen = buf.getInt();
buf.reset();
- if (payloadLen == 0) {
- //end of stream!
- buf.limit(buf.position() + SIZE_OF_INTEGER);
- return 0;
- }
-
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
@@ -411,21 +399,15 @@
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
-
- return payloadLen;
}
/**
* Receives and processes a packet. It can contain many chunks.
- * returns size of the packet.
+ * returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
-
- int payloadLen = readNextPacket();
-
- if (payloadLen <= 0) {
- return payloadLen;
- }
+ // read the next packet
+ readNextPacket();
buf.mark();
//read the header
@@ -451,7 +433,7 @@
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
- " of length " + payloadLen +
+ " of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" lastPacketInBlock " + lastPacketInBlock);
@@ -462,6 +444,12 @@
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
}
+
+ // put in queue for pending acks
+ if (responder != null) {
+ ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+ lastPacketInBlock, offsetInBlock);
+ }
//First write the packet to the mirror:
if (mirrorOut != null) {
@@ -475,8 +463,8 @@
buf.position(endOfHeader);
- if (len == 0) {
- LOG.debug("Receiving empty packet for block " + block);
+ if (lastPacketInBlock || len == 0) {
+ LOG.debug("Receiving an empty packet or the end of the block " + block);
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -539,17 +527,11 @@
/// flush entire packet before sending ack
flush();
- // put in queue for pending acks
- if (responder != null) {
- ((PacketResponder)responder.getRunnable()).enqueue(seqno,
- lastPacketInBlock, offsetInBlock);
- }
-
if (throttler != null) { // throttle I/O
- throttler.throttle(payloadLen);
+ throttler.throttle(len);
}
- return payloadLen;
+ return len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -578,20 +560,10 @@
}
/*
- * Receive until packet length is zero.
+ * Receive until packet has zero bytes of data.
*/
while (receivePacket() > 0) {}
- // flush the mirror out
- if (mirrorOut != null) {
- try {
- mirrorOut.writeInt(0); // mark the end of the block
- mirrorOut.flush();
- } catch (IOException e) {
- handleMirrorOutError(e);
- }
- }
-
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
@@ -846,9 +818,7 @@
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
@@ -987,9 +957,7 @@
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 29 06:59:47 2009
@@ -270,10 +270,6 @@
int len = Math.min((int) (endOffset - offset),
bytesPerChecksum*maxChunks);
- if (len == 0) {
- return 0;
- }
-
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();
@@ -282,7 +278,7 @@
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
- pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+ pkt.put((byte)((len == 0) ? 1 : 0));
//why no ByteBuf.putBoolean()?
pkt.putInt(len);
@@ -443,7 +439,8 @@
seqno++;
}
try {
- out.writeInt(0); // mark the end of block
+ // send an empty packet to mark the end of the block
+ sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 29 06:59:47 2009
@@ -1275,6 +1275,20 @@
}
}
}
+
+ /**
+ * After a block becomes finalized, a datanode increases metric counter,
+ * notifies namenode, and adds it to the block scanner
+ * @param block
+ * @param delHint
+ */
+ void closeBlock(Block block, String delHint) {
+ myMetrics.blocksWritten.inc();
+ notifyNamenodeReceivedBlock(block, delHint);
+ if (blockScanner != null) {
+ blockScanner.addBlock(block);
+ }
+ }
/**
* No matter what kind of exception we get, keep retrying to offerService().
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Sep 29 06:59:47 2009
@@ -252,12 +252,17 @@
String firstBadLink = ""; // first datanode that failed in connection setup
DataTransferProtocol.Status mirrorInStatus = SUCCESS;
try {
- // open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(block, in,
- s.getRemoteSocketAddress().toString(),
- s.getLocalSocketAddress().toString(),
- stage, newGs, minBytesRcvd, maxBytesRcvd,
- client, srcDataNode, datanode);
+ if (client.length() == 0 ||
+ stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ // open a block receiver
+ blockReceiver = new BlockReceiver(block, in,
+ s.getRemoteSocketAddress().toString(),
+ s.getLocalSocketAddress().toString(),
+ stage, newGs, minBytesRcvd, maxBytesRcvd,
+ client, srcDataNode, datanode);
+ } else {
+ datanode.data.recoverClose(block, newGs, minBytesRcvd);
+ }
//
// Open network conn to backup machine, if
@@ -289,7 +294,9 @@
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client,
srcDataNode, targets, accessToken);
- blockReceiver.writeChecksumHeader(mirrorOut);
+ if (blockReceiver != null) { // send checksum header
+ blockReceiver.writeChecksumHeader(mirrorOut);
+ }
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
@@ -340,24 +347,30 @@
}
// receive the block and mirror to the next target
- String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
- blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
+ if (blockReceiver != null) {
+ String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+ blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+ mirrorAddr, null, targets.length);
+ }
- // if this write is for a replication request (and not
- // from a client), then confirm block. For client-writes,
+ // update its generation stamp
+ if (client.length() != 0 &&
+ stage != BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+ block.setGenerationStamp(newGs);
+ }
+
+ // if this write is for a replication request or recovering
+ // a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
- if (client.length() == 0) {
- datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+ if (client.length() == 0 ||
+ stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
" of size " + block.getNumBytes());
}
- if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(block);
- }
} catch (IOException ioe) {
LOG.info("writeBlock " + block + " received exception " + ioe);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Sep 29 06:59:47 2009
@@ -1175,17 +1175,21 @@
return newReplicaInfo;
}
- @Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
- long newGS, long expectedBlockLen) throws IOException {
- DataNode.LOG.info("Recover failed append to " + b);
-
+ private ReplicaInfo recoverCheck(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
if (replicaInfo == null) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
}
+ // check state
+ if (replicaInfo.getState() != ReplicaState.FINALIZED &&
+ replicaInfo.getState() != ReplicaState.RBW) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+ }
+
// check generation stamp
long replicaGenerationStamp = replicaInfo.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1219,20 +1223,39 @@
" with a length of " + replicaLen +
" expected length is " + expectedBlockLen);
}
+
+ return replicaInfo;
+ }
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed append to " + b);
+
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// change the replica's state/gs etc.
- switch (replicaInfo.getState()) {
- case FINALIZED:
+ if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
- case RBW:
+ } else { //RBW
bumpReplicaGS(replicaInfo, newGS);
return (ReplicaBeingWritten)replicaInfo;
- default:
- throw new ReplicaNotFoundException(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
}
}
+ @Override
+ public void recoverClose(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed close " + b);
+ // check replica's state
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+ // bump the replica's GS
+ bumpReplicaGS(replicaInfo, newGS);
+ // finalize the replica if RBW
+ if (replicaInfo.getState() == ReplicaState.RBW) {
+ finalizeBlock(replicaInfo);
+ }
+ }
+
/**
* Bump a replica's generation stamp to a new one.
* Its on-disk meta file name is renamed to be the new one too.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Sep 29 06:59:47 2009
@@ -240,6 +240,18 @@
long newGS, long expectedBlockLen) throws IOException;
/**
+ * Recover a failed pipeline close
+ * It bumps the replica's generation stamp and finalize it if RBW replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @throws IOException
+ */
+ public void recoverClose(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
+
+ /**
* Update the block to the new generation stamp and length.
*/
public void updateBlock(Block oldblock, Block newblock) throws IOException;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Tue Sep 29 06:59:47 2009
@@ -33,7 +33,7 @@
final static String UNFINALIZED_REPLICA =
"Cannot append to an unfinalized replica ";
final static String UNFINALIZED_AND_NONRBW_REPLICA =
- "Cannot recover appending to a replica that's not FINALIZED and not RBW ";
+ "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
final static String NON_EXISTENT_REPLICA =
"Cannot append to a non-existent replica ";
final static String UNEXPECTED_GS_REPLICA =
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 29 06:59:47 2009
@@ -46,7 +46,6 @@
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -154,10 +153,7 @@
sendOut.writeInt(0); // chunk length
sendOut.writeInt(0); // zero checksum
-
- // mark the end of block
- sendOut.writeInt(0);
-
+
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
Text.writeString(recvOut, ""); // first bad node
@@ -177,6 +173,11 @@
if (eofExcepted) {
ERROR.write(recvOut);
sendRecvData(description, true);
+ } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ //ok finally write a block with 0 len
+ SUCCESS.write(recvOut);
+ Text.writeString(recvOut, ""); // first bad node
+ sendRecvData(description, false);
} else {
writeZeroLengthPacket(block, description);
}
@@ -208,8 +209,7 @@
long newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
- newGS, "Successful for now", false);
- firstBlock.setGenerationStamp(newGS);
+ newGS, "Cannot recover data streaming to a finalized replica", true);
// test PIPELINE_SETUP_APPEND on an existing block
newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
@@ -217,10 +217,21 @@
newGS, "Append to a finalized replica", false);
firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
+ file = new Path("dataprotocol1.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
"Recover appending to a finalized replica", false);
+ // test PIPELINE_CLOSE_RECOVERY on an existing block
+ file = new Path("dataprotocol2.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ newGS = firstBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS,
+ "Recover failed close to a finalized replica", false);
firstBlock.setGenerationStamp(newGS);
/* Test writing to a new block */
@@ -276,11 +287,19 @@
newGS, "Recover append to a RBW replica", false);
firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+ file = new Path("dataprotocol2.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ out = (DFSOutputStream)(fileSys.append(file).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+ in = fileSys.open(file);
+ firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+ firstBlock.setNumBytes(2L);
newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
newGS, "Recover a RBW replica", false);
- firstBlock.setGenerationStamp(newGS);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Sep 29 06:59:47 2009
@@ -36,7 +36,7 @@
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 extends junit.framework.TestCase {
- static final long BLOCK_SIZE = 3 * 64 * 1024;
+ static final long BLOCK_SIZE = 64 * 1024;
static final short REPLICATION = 3;
static final int DATANODE_NUM = 5;
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Sep 29 06:59:47 2009
@@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.log4j.Level;
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Sep 29 06:59:47 2009
@@ -484,18 +484,36 @@
public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
long newGS, long expectedBlockLen) throws IOException {
BInfo binfo = blockMap.get(b);
- if (binfo == null || !binfo.isFinalized()) {
+ if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
}
if (binfo.isFinalized()) {
binfo.unfinalizeBlock();
}
+ blockMap.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
+ blockMap.put(binfo.theBlock, binfo);
return binfo;
}
@Override
+ public void recoverClose(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " is not valid, and cannot be appended to.");
+ }
+ if (!binfo.isFinalized()) {
+ binfo.finalizeBlock(binfo.getNumBytes());
+ }
+ blockMap.remove(b);
+ binfo.theBlock.setGenerationStamp(newGS);
+ blockMap.put(binfo.theBlock, binfo);
+ }
+
+ @Override
public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
BInfo binfo = blockMap.get(b);
@@ -507,7 +525,9 @@
throw new ReplicaAlreadyExistsException("Block " + b
+ " is valid, and cannot be written to.");
}
+ blockMap.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
+ blockMap.put(binfo.theBlock, binfo);
return binfo;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Sep 29 06:59:47 2009
@@ -32,8 +32,11 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessToken;
@@ -114,17 +117,12 @@
DataOutputStream out = new DataOutputStream(
s.getOutputStream());
- out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
- WRITE_BLOCK.write(out);
- out.writeLong( block.getBlock().getBlockId());
- out.writeLong( block.getBlock().getGenerationStamp() );
- out.writeInt(1);
- out.writeBoolean( false ); // recovery flag
- Text.writeString( out, "" );
- out.writeBoolean(false); // Not sending src node information
- out.writeInt(0);
- AccessToken.DUMMY_TOKEN.write(out);
-
+ Sender.opWriteBlock(out, block.getBlock().getBlockId(),
+ block.getBlock().getGenerationStamp(), 1,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ 0L, 0L, 0L, "", null, new DatanodeInfo[0],
+ AccessToken.DUMMY_TOKEN);
+
// write check header
out.writeByte( 1 );
out.writeInt( 512 );
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=819843&r1=819842&r2=819843&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Tue Sep 29 06:59:47 2009
@@ -41,6 +41,25 @@
final private static int RUR = 4;
final private static int NON_EXISTENT = 5;
+ // test close
+ @Test
+ public void testClose() throws Exception {
+ MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+ try {
+ cluster.waitActive();
+ DataNode dn = cluster.getDataNodes().get(0);
+ FSDataset dataSet = (FSDataset)dn.data;
+
+ // set up replicasMap
+ setup(dataSet);
+
+ // test close
+ testClose(dataSet);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
// test append
@Test
public void testAppend() throws Exception {
@@ -79,7 +98,7 @@
}
}
- // test writeToRbw
+ // test writeToTemporary
@Test
public void testWriteToTempoary() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
@@ -113,9 +132,8 @@
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
- replicaInfo = new ReplicaBeingWritten(blocks[RBW].getBlockId(),
- blocks[RBW].getGenerationStamp(), vol,
- vol.createRbwFile(blocks[RBW]).getParentFile());
+ replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol,
+ vol.createRbwFile(blocks[RBW]).getParentFile(), null);
replicasMap.add(replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
@@ -127,8 +145,10 @@
}
private void testAppend(FSDataset dataSet) throws IOException {
- dataSet.append(blocks[FINALIZED], blocks[FINALIZED].getGenerationStamp()+1,
+ long newGS = blocks[FINALIZED].getGenerationStamp()+1;
+ dataSet.append(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
+ blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
@@ -177,8 +197,106 @@
Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA +
blocks[NON_EXISTENT], e.getMessage());
}
+
+ newGS = blocks[FINALIZED].getGenerationStamp()+1;
+ dataSet.recoverAppend(blocks[FINALIZED], newGS,
+ blocks[FINALIZED].getNumBytes()); // successful
+ blocks[FINALIZED].setGenerationStamp(newGS);
+
+ try {
+ dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
+ blocks[TEMPORARY].getNumBytes());
+ Assert.fail("Should not have appended to a temporary replica "
+ + blocks[TEMPORARY]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+ }
+
+ newGS = blocks[RBW].getGenerationStamp()+1;
+ dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
+ blocks[RBW].setGenerationStamp(newGS);
+
+ try {
+ dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+ blocks[RBW].getNumBytes());
+ Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+ }
+
+ try {
+ dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+ blocks[RUR].getNumBytes());
+ Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+ }
+
+ try {
+ dataSet.recoverAppend(blocks[NON_EXISTENT],
+ blocks[NON_EXISTENT].getGenerationStamp(),
+ blocks[NON_EXISTENT].getNumBytes());
+ Assert.fail("Should not have appended to a non-existent replica " +
+ blocks[NON_EXISTENT]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+ }
}
+ private void testClose(FSDataset dataSet) throws IOException {
+ long newGS = blocks[FINALIZED].getGenerationStamp()+1;
+ dataSet.recoverClose(blocks[FINALIZED], newGS,
+ blocks[FINALIZED].getNumBytes()); // successful
+ blocks[FINALIZED].setGenerationStamp(newGS);
+
+ try {
+ dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
+ blocks[TEMPORARY].getNumBytes());
+ Assert.fail("Should not have recovered close a temporary replica "
+ + blocks[TEMPORARY]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+ }
+
+ newGS = blocks[RBW].getGenerationStamp()+1;
+ dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
+ blocks[RBW].setGenerationStamp(newGS);
+
+ try {
+ dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
+ blocks[RBW].getNumBytes());
+ Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+ }
+
+ try {
+ dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
+ blocks[RUR].getNumBytes());
+ Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
+ }
+
+ try {
+ dataSet.recoverClose(blocks[NON_EXISTENT],
+ blocks[NON_EXISTENT].getGenerationStamp(),
+ blocks[NON_EXISTENT].getNumBytes());
+ Assert.fail("Should not have recovered close a non-existent replica " +
+ blocks[NON_EXISTENT]);
+ } catch (ReplicaNotFoundException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA));
+ }
+ }
+
private void testWriteToRbw(FSDataset dataSet) throws IOException {
try {
dataSet.recoverRbw(blocks[FINALIZED],