You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/29 20:18:15 UTC
svn commit: r820053 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Author: hairong
Date: Tue Sep 29 18:18:14 2009
New Revision: 820053
URL: http://svn.apache.org/viewvc?rev=820053&view=rev
Log:
Revert the changes of HDFS-642 by merging -r 819843:819842
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Tue Sep 29 18:18:14 2009
@@ -50,8 +50,6 @@
HDFS-627. Support replica update in data-node.
(Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
- HDFS-642. Support pipeline close and close error recovery. (hairong)
-
IMPROVEMENTS
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 18:18:14 2009
@@ -1431,8 +1431,8 @@
int dataLen = in.readInt();
// Sanity check the lengths
- if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
- ( dataLen != 0 && lastPacketInBlock) ||
+ if ( dataLen < 0 ||
+ ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
(seqno != (lastSeqNo + 1)) ) {
throw new IOException("BlockReader: error in packet header" +
"(chunkOffset : " + chunkOffset +
@@ -2598,16 +2598,7 @@
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}
-
- private void endBlock() {
- LOG.debug("Closing old block " + block);
- this.setName("DataStreamer for file " + src);
- closeResponder();
- closeStream();
- nodes = null;
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
- }
-
+
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
@@ -2651,6 +2642,8 @@
one = dataQueue.getFirst();
}
+ long offsetInBlock = one.offsetInBlock;
+
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
@@ -2662,34 +2655,14 @@
initDataStreaming();
}
- long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
- if (lastByteOffsetInBlock > blockSize) {
+ if (offsetInBlock >= blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
- lastByteOffsetInBlock +
+ offsetInBlock +
" Aborting file " + src);
}
- if (one.lastPacketInBlock) {
- // wait for all data packets have been successfully acked
- synchronized (dataQueue) {
- while (!streamerClosed && !hasError &&
- ackQueue.size() != 0 && clientRunning) {
- try {
- // wait for acks to arrive from datanodes
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
- }
- }
- }
- if (streamerClosed || hasError || !clientRunning) {
- continue;
- }
- stage = BlockConstructionStage.PIPELINE_CLOSE;
- }
-
- // send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) {
@@ -2701,7 +2674,11 @@
if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block +
- " sending packet " + one);
+ " sending packet seqno:" + one.seqno +
+ " size:" + buf.remaining() +
+ " offsetInBlock:" + one.offsetInBlock +
+ " lastPacketInBlock:" + one.lastPacketInBlock +
+ " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
}
// write out data to remote datanode
@@ -2713,31 +2690,22 @@
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
-
- if (streamerClosed || hasError || !clientRunning) {
- continue;
- }
-
- // Is this block full?
+
if (one.lastPacketInBlock) {
- // wait for the close packet has been acked
synchronized (dataQueue) {
- while (!streamerClosed && !hasError &&
- ackQueue.size() != 0 && clientRunning) {
- dataQueue.wait(1000);// wait for acks to arrive from datanodes
+ while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
+ try {
+ dataQueue.wait(1000); // wait for acks to arrive from datanodes
+ } catch (InterruptedException e) {
+ }
}
}
- if (streamerClosed || hasError || !clientRunning) {
- continue;
+
+ if (ackQueue.isEmpty()) { // done receiving all acks
+ // indicate end-of-block
+ blockStream.writeInt(0);
+ blockStream.flush();
}
-
- endBlock();
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && clientRunning) {
- Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
@@ -2750,6 +2718,29 @@
streamerClosed = true;
}
}
+
+
+ if (streamerClosed || hasError || !clientRunning) {
+ continue;
+ }
+
+ // Is this block full?
+ if (one.lastPacketInBlock) {
+ LOG.debug("Closing old block " + block);
+ this.setName("DataStreamer for file " + src);
+ closeResponder();
+ closeStream();
+ nodes = null;
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+ if (progress != null) { progress.progress(); }
+
+ // This is used by unit test to trigger race conditions.
+ if (artificialSlowdown != 0 && clientRunning) {
+ try {
+ Thread.sleep(artificialSlowdown);
+ } catch (InterruptedException e) {}
+ }
}
closeInternal();
}
@@ -2937,15 +2928,7 @@
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && clientRunning) {
- if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
- synchronized (dataQueue) {
- dataQueue.remove(); // remove the end of block packet
- dataQueue.notifyAll();
- }
- endBlock();
- } else {
- initDataStreaming();
- }
+ initDataStreaming();
}
return doSleep;
@@ -3409,6 +3392,15 @@
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
+ //
+ // if we allocated a new packet because we encountered a block
+ // boundary, reset bytesCurBlock.
+ //
+ if (bytesCurBlock == blockSize) {
+ currentPacket.lastPacketInBlock = true;
+ bytesCurBlock = 0;
+ lastFlushOffset = -1;
+ }
waitAndQueuePacket(currentPacket);
currentPacket = null;
@@ -3421,20 +3413,6 @@
}
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
-
- //
- // if encountering a block boundary, send an empty packet to
- // indicate the end of block and reset bytesCurBlock.
- //
- if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
- bytesCurBlock);
- currentPacket.lastPacketInBlock = true;
- waitAndQueuePacket(currentPacket);
- currentPacket = null;
- bytesCurBlock = 0;
- lastFlushOffset = -1;
- }
}
}
@@ -3578,22 +3556,21 @@
try {
flushBuffer(); // flush from all upper layers
- if (currentPacket != null) {
- waitAndQueuePacket(currentPacket);
- }
-
- if (bytesCurBlock != 0) {
- // send an empty packet to mark the end of the block
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ // Mark that this packet is the last packet in block.
+ // If there are no outstanding packets and the last packet
+ // was not the last one in the current block, then create a
+ // packet with empty payload.
+ if (currentPacket == null && bytesCurBlock != 0) {
+ currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
+ }
+ if (currentPacket != null) {
currentPacket.lastPacketInBlock = true;
}
flushInternal(); // flush all data to Datanodes
- LOG.info("Done flushing");
// get last block before destroying the streamer
Block lastBlock = streamer.getBlock();
- LOG.info("Closing the streams...");
closeThreads(false);
completeFile(lastBlock);
leasechecker.remove(src);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 29 18:18:14 2009
@@ -102,8 +102,14 @@
replicaInfo = datanode.data.createRbw(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
- replicaInfo = datanode.data.recoverRbw(
- block, newGs, minBytesRcvd, maxBytesRcvd);
+ if (datanode.data.isValidBlock(block)) {
+ // pipeline failed after the replica is finalized. This will be
+ // handled differently when pipeline close/recovery is introduced
+ replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd);
+ } else {
+ replicaInfo = datanode.data.recoverRbw(
+ block, newGs, minBytesRcvd, maxBytesRcvd);
+ }
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
@@ -324,7 +330,7 @@
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
- private void readNextPacket() throws IOException {
+ private int readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* for next packet at the same time.
@@ -360,6 +366,12 @@
int payloadLen = buf.getInt();
buf.reset();
+ if (payloadLen == 0) {
+ //end of stream!
+ buf.limit(buf.position() + SIZE_OF_INTEGER);
+ return 0;
+ }
+
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
@@ -399,15 +411,21 @@
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
+
+ return payloadLen;
}
/**
* Receives and processes a packet. It can contain many chunks.
- * returns the number of data bytes that the packet has.
+ * returns size of the packet.
*/
private int receivePacket() throws IOException {
- // read the next packet
- readNextPacket();
+
+ int payloadLen = readNextPacket();
+
+ if (payloadLen <= 0) {
+ return payloadLen;
+ }
buf.mark();
//read the header
@@ -433,7 +451,7 @@
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
- " of length " + len +
+ " of length " + payloadLen +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" lastPacketInBlock " + lastPacketInBlock);
@@ -444,12 +462,6 @@
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
}
-
- // put in queue for pending acks
- if (responder != null) {
- ((PacketResponder)responder.getRunnable()).enqueue(seqno,
- lastPacketInBlock, offsetInBlock);
- }
//First write the packet to the mirror:
if (mirrorOut != null) {
@@ -463,8 +475,8 @@
buf.position(endOfHeader);
- if (lastPacketInBlock || len == 0) {
- LOG.debug("Receiving an empty packet or the end of the block " + block);
+ if (len == 0) {
+ LOG.debug("Receiving empty packet for block " + block);
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -527,11 +539,17 @@
/// flush entire packet before sending ack
flush();
+ // put in queue for pending acks
+ if (responder != null) {
+ ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+ lastPacketInBlock, offsetInBlock);
+ }
+
if (throttler != null) { // throttle I/O
- throttler.throttle(len);
+ throttler.throttle(payloadLen);
}
- return len;
+ return payloadLen;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -560,10 +578,20 @@
}
/*
- * Receive until packet has zero bytes of data.
+ * Receive until packet length is zero.
*/
while (receivePacket() > 0) {}
+ // flush the mirror out
+ if (mirrorOut != null) {
+ try {
+ mirrorOut.writeInt(0); // mark the end of the block
+ mirrorOut.flush();
+ } catch (IOException e) {
+ handleMirrorOutError(e);
+ }
+ }
+
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
@@ -818,7 +846,9 @@
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ datanode.myMetrics.blocksWritten.inc();
+ datanode.notifyNamenodeReceivedBlock(block,
+ DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
@@ -957,7 +987,9 @@
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ datanode.myMetrics.blocksWritten.inc();
+ datanode.notifyNamenodeReceivedBlock(block,
+ DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 29 18:18:14 2009
@@ -270,6 +270,10 @@
int len = Math.min((int) (endOffset - offset),
bytesPerChecksum*maxChunks);
+ if (len == 0) {
+ return 0;
+ }
+
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();
@@ -278,7 +282,7 @@
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
- pkt.put((byte)((len == 0) ? 1 : 0));
+ pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
//why no ByteBuf.putBoolean()?
pkt.putInt(len);
@@ -439,8 +443,7 @@
seqno++;
}
try {
- // send an empty packet to mark the end of the block
- sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
+ out.writeInt(0); // mark the end of block
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 29 18:18:14 2009
@@ -1275,20 +1275,6 @@
}
}
}
-
- /**
- * After a block becomes finalized, a datanode increases metric counter,
- * notifies namenode, and adds it to the block scanner
- * @param block
- * @param delHint
- */
- void closeBlock(Block block, String delHint) {
- myMetrics.blocksWritten.inc();
- notifyNamenodeReceivedBlock(block, delHint);
- if (blockScanner != null) {
- blockScanner.addBlock(block);
- }
- }
/**
* No matter what kind of exception we get, keep retrying to offerService().
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Sep 29 18:18:14 2009
@@ -252,17 +252,12 @@
String firstBadLink = ""; // first datanode that failed in connection setup
DataTransferProtocol.Status mirrorInStatus = SUCCESS;
try {
- if (client.length() == 0 ||
- stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- // open a block receiver
- blockReceiver = new BlockReceiver(block, in,
- s.getRemoteSocketAddress().toString(),
- s.getLocalSocketAddress().toString(),
- stage, newGs, minBytesRcvd, maxBytesRcvd,
- client, srcDataNode, datanode);
- } else {
- datanode.data.recoverClose(block, newGs, minBytesRcvd);
- }
+ // open a block receiver and check if the block does not exist
+ blockReceiver = new BlockReceiver(block, in,
+ s.getRemoteSocketAddress().toString(),
+ s.getLocalSocketAddress().toString(),
+ stage, newGs, minBytesRcvd, maxBytesRcvd,
+ client, srcDataNode, datanode);
//
// Open network conn to backup machine, if
@@ -294,9 +289,7 @@
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client,
srcDataNode, targets, accessToken);
- if (blockReceiver != null) { // send checksum header
- blockReceiver.writeChecksumHeader(mirrorOut);
- }
+ blockReceiver.writeChecksumHeader(mirrorOut);
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
@@ -347,30 +340,24 @@
}
// receive the block and mirror to the next target
- if (blockReceiver != null) {
- String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
- blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
- }
+ String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+ blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+ mirrorAddr, null, targets.length);
- // update its generation stamp
- if (client.length() != 0 &&
- stage != BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- block.setGenerationStamp(newGs);
- }
-
- // if this write is for a replication request or recovering
- // a failed close for client, then confirm block. For other client-writes,
+ // if this write is for a replication request (and not
+ // from a client), then confirm block. For client-writes,
// the block is finalized in the PacketResponder.
- if (client.length() == 0 ||
- stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ if (client.length() == 0) {
+ datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
" of size " + block.getNumBytes());
}
+ if (datanode.blockScanner != null) {
+ datanode.blockScanner.addBlock(block);
+ }
} catch (IOException ioe) {
LOG.info("writeBlock " + block + " received exception " + ioe);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Sep 29 18:18:14 2009
@@ -1175,21 +1175,17 @@
return newReplicaInfo;
}
- private ReplicaInfo recoverCheck(Block b, long newGS,
- long expectedBlockLen) throws IOException {
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed append to " + b);
+
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
if (replicaInfo == null) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
}
- // check state
- if (replicaInfo.getState() != ReplicaState.FINALIZED &&
- replicaInfo.getState() != ReplicaState.RBW) {
- throw new ReplicaNotFoundException(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
- }
-
// check generation stamp
long replicaGenerationStamp = replicaInfo.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1223,39 +1219,20 @@
" with a length of " + replicaLen +
" expected length is " + expectedBlockLen);
}
-
- return replicaInfo;
- }
- @Override // FSDatasetInterface
- public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
- long newGS, long expectedBlockLen) throws IOException {
- DataNode.LOG.info("Recover failed append to " + b);
-
- ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// change the replica's state/gs etc.
- if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
+ switch (replicaInfo.getState()) {
+ case FINALIZED:
return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
- } else { //RBW
+ case RBW:
bumpReplicaGS(replicaInfo, newGS);
return (ReplicaBeingWritten)replicaInfo;
+ default:
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
}
}
- @Override
- public void recoverClose(Block b, long newGS,
- long expectedBlockLen) throws IOException {
- DataNode.LOG.info("Recover failed close " + b);
- // check replica's state
- ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
- // bump the replica's GS
- bumpReplicaGS(replicaInfo, newGS);
- // finalize the replica if RBW
- if (replicaInfo.getState() == ReplicaState.RBW) {
- finalizeBlock(replicaInfo);
- }
- }
-
/**
* Bump a replica's generation stamp to a new one.
* Its on-disk meta file name is renamed to be the new one too.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Sep 29 18:18:14 2009
@@ -240,18 +240,6 @@
long newGS, long expectedBlockLen) throws IOException;
/**
- * Recover a failed pipeline close
- * It bumps the replica's generation stamp and finalize it if RBW replica
- *
- * @param b block
- * @param newGS the new generation stamp for the replica
- * @param expectedBlockLen the number of bytes the replica is expected to have
- * @throws IOException
- */
- public void recoverClose(Block b,
- long newGS, long expectedBlockLen) throws IOException;
-
- /**
* Update the block to the new generation stamp and length.
*/
public void updateBlock(Block oldblock, Block newblock) throws IOException;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Tue Sep 29 18:18:14 2009
@@ -33,7 +33,7 @@
final static String UNFINALIZED_REPLICA =
"Cannot append to an unfinalized replica ";
final static String UNFINALIZED_AND_NONRBW_REPLICA =
- "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
+ "Cannot recover appending to a replica that's not FINALIZED and not RBW ";
final static String NON_EXISTENT_REPLICA =
"Cannot append to a non-existent replica ";
final static String UNEXPECTED_GS_REPLICA =
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 29 18:18:14 2009
@@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -153,7 +154,10 @@
sendOut.writeInt(0); // chunk length
sendOut.writeInt(0); // zero checksum
-
+
+ // mark the end of block
+ sendOut.writeInt(0);
+
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
Text.writeString(recvOut, ""); // first bad node
@@ -173,11 +177,6 @@
if (eofExcepted) {
ERROR.write(recvOut);
sendRecvData(description, true);
- } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- //ok finally write a block with 0 len
- SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- sendRecvData(description, false);
} else {
writeZeroLengthPacket(block, description);
}
@@ -209,7 +208,8 @@
long newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
- newGS, "Cannot recover data streaming to a finalized replica", true);
+ newGS, "Successful for now", false);
+ firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_APPEND on an existing block
newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
@@ -217,21 +217,10 @@
newGS, "Append to a finalized replica", false);
firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
- file = new Path("dataprotocol1.dat");
- DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
- firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
"Recover appending to a finalized replica", false);
- // test PIPELINE_CLOSE_RECOVERY on an existing block
- file = new Path("dataprotocol2.dat");
- DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
- firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
- newGS = firstBlock.getGenerationStamp() + 1;
- testWrite(firstBlock,
- BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS,
- "Recover failed close to a finalized replica", false);
firstBlock.setGenerationStamp(newGS);
/* Test writing to a new block */
@@ -287,19 +276,11 @@
newGS, "Recover append to a RBW replica", false);
firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
- file = new Path("dataprotocol2.dat");
- DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
- out = (DFSOutputStream)(fileSys.append(file).
- getWrappedStream());
- out.write(1);
- out.hflush();
- in = fileSys.open(file);
- firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
- firstBlock.setNumBytes(2L);
newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
newGS, "Recover a RBW replica", false);
+ firstBlock.setGenerationStamp(newGS);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Sep 29 18:18:14 2009
@@ -36,7 +36,7 @@
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 extends junit.framework.TestCase {
- static final long BLOCK_SIZE = 64 * 1024;
+ static final long BLOCK_SIZE = 3 * 64 * 1024;
static final short REPLICATION = 3;
static final int DATANODE_NUM = 5;
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Sep 29 18:18:14 2009
@@ -48,7 +48,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.log4j.Level;
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Sep 29 18:18:14 2009
@@ -484,36 +484,18 @@
public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
long newGS, long expectedBlockLen) throws IOException {
BInfo binfo = blockMap.get(b);
- if (binfo == null) {
+ if (binfo == null || !binfo.isFinalized()) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
}
if (binfo.isFinalized()) {
binfo.unfinalizeBlock();
}
- blockMap.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
- blockMap.put(binfo.theBlock, binfo);
return binfo;
}
@Override
- public void recoverClose(Block b, long newGS,
- long expectedBlockLen) throws IOException {
- BInfo binfo = blockMap.get(b);
- if (binfo == null) {
- throw new ReplicaNotFoundException("Block " + b
- + " is not valid, and cannot be appended to.");
- }
- if (!binfo.isFinalized()) {
- binfo.finalizeBlock(binfo.getNumBytes());
- }
- blockMap.remove(b);
- binfo.theBlock.setGenerationStamp(newGS);
- blockMap.put(binfo.theBlock, binfo);
- }
-
- @Override
public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
BInfo binfo = blockMap.get(b);
@@ -525,9 +507,7 @@
throw new ReplicaAlreadyExistsException("Block " + b
+ " is valid, and cannot be written to.");
}
- blockMap.remove(b);
binfo.theBlock.setGenerationStamp(newGS);
- blockMap.put(binfo.theBlock, binfo);
return binfo;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Sep 29 18:18:14 2009
@@ -32,11 +32,8 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessToken;
@@ -117,12 +114,17 @@
DataOutputStream out = new DataOutputStream(
s.getOutputStream());
- Sender.opWriteBlock(out, block.getBlock().getBlockId(),
- block.getBlock().getGenerationStamp(), 1,
- BlockConstructionStage.PIPELINE_SETUP_CREATE,
- 0L, 0L, 0L, "", null, new DatanodeInfo[0],
- AccessToken.DUMMY_TOKEN);
-
+ out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
+ WRITE_BLOCK.write(out);
+ out.writeLong( block.getBlock().getBlockId());
+ out.writeLong( block.getBlock().getGenerationStamp() );
+ out.writeInt(1);
+ out.writeBoolean( false ); // recovery flag
+ Text.writeString( out, "" );
+ out.writeBoolean(false); // Not sending src node information
+ out.writeInt(0);
+ AccessToken.DUMMY_TOKEN.write(out);
+
// write check header
out.writeByte( 1 );
out.writeInt( 512 );
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=820053&r1=820052&r2=820053&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Tue Sep 29 18:18:14 2009
@@ -41,25 +41,6 @@
final private static int RUR = 4;
final private static int NON_EXISTENT = 5;
- // test close
- @Test
- public void testClose() throws Exception {
- MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
- try {
- cluster.waitActive();
- DataNode dn = cluster.getDataNodes().get(0);
- FSDataset dataSet = (FSDataset)dn.data;
-
- // set up replicasMap
- setup(dataSet);
-
- // test close
- testClose(dataSet);
- } finally {
- cluster.shutdown();
- }
- }
-
// test append
@Test
public void testAppend() throws Exception {
@@ -98,7 +79,7 @@
}
}
- // test writeToTemporary
+ // test writeToRbw
@Test
public void testWriteToTempoary() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
@@ -132,8 +113,9 @@
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
- replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol,
- vol.createRbwFile(blocks[RBW]).getParentFile(), null);
+ replicaInfo = new ReplicaBeingWritten(blocks[RBW].getBlockId(),
+ blocks[RBW].getGenerationStamp(), vol,
+ vol.createRbwFile(blocks[RBW]).getParentFile());
replicasMap.add(replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
@@ -145,10 +127,8 @@
}
private void testAppend(FSDataset dataSet) throws IOException {
- long newGS = blocks[FINALIZED].getGenerationStamp()+1;
- dataSet.append(blocks[FINALIZED], newGS,
+ dataSet.append(blocks[FINALIZED], blocks[FINALIZED].getGenerationStamp()+1,
blocks[FINALIZED].getNumBytes()); // successful
- blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
@@ -197,106 +177,8 @@
Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA +
blocks[NON_EXISTENT], e.getMessage());
}
-
- newGS = blocks[FINALIZED].getGenerationStamp()+1;
- dataSet.recoverAppend(blocks[FINALIZED], newGS,
- blocks[FINALIZED].getNumBytes()); // successful
- blocks[FINALIZED].setGenerationStamp(newGS);
-
- try {
- dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
- blocks[TEMPORARY].getNumBytes());
- Assert.fail("Should not have appended to a temporary replica "
- + blocks[TEMPORARY]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
- }
-
- newGS = blocks[RBW].getGenerationStamp()+1;
- dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
- blocks[RBW].setGenerationStamp(newGS);
-
- try {
- dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
- blocks[RBW].getNumBytes());
- Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
- }
-
- try {
- dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
- blocks[RUR].getNumBytes());
- Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
- }
-
- try {
- dataSet.recoverAppend(blocks[NON_EXISTENT],
- blocks[NON_EXISTENT].getGenerationStamp(),
- blocks[NON_EXISTENT].getNumBytes());
- Assert.fail("Should not have appended to a non-existent replica " +
- blocks[NON_EXISTENT]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.NON_EXISTENT_REPLICA));
- }
}
- private void testClose(FSDataset dataSet) throws IOException {
- long newGS = blocks[FINALIZED].getGenerationStamp()+1;
- dataSet.recoverClose(blocks[FINALIZED], newGS,
- blocks[FINALIZED].getNumBytes()); // successful
- blocks[FINALIZED].setGenerationStamp(newGS);
-
- try {
- dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
- blocks[TEMPORARY].getNumBytes());
- Assert.fail("Should not have recovered close a temporary replica "
- + blocks[TEMPORARY]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
- }
-
- newGS = blocks[RBW].getGenerationStamp()+1;
- dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
- blocks[RBW].setGenerationStamp(newGS);
-
- try {
- dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
- blocks[RBW].getNumBytes());
- Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
- }
-
- try {
- dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
- blocks[RUR].getNumBytes());
- Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
- }
-
- try {
- dataSet.recoverClose(blocks[NON_EXISTENT],
- blocks[NON_EXISTENT].getGenerationStamp(),
- blocks[NON_EXISTENT].getNumBytes());
- Assert.fail("Should not have recovered close a non-existent replica " +
- blocks[NON_EXISTENT]);
- } catch (ReplicaNotFoundException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- ReplicaNotFoundException.NON_EXISTENT_REPLICA));
- }
- }
-
private void testWriteToRbw(FSDataset dataSet) throws IOException {
try {
dataSet.recoverRbw(blocks[FINALIZED],