You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/24 22:51:39 UTC
svn commit: r818622 [1/2] - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/...
Author: hairong
Date: Thu Sep 24 20:51:38 2009
New Revision: 818622
URL: http://svn.apache.org/viewvc?rev=818622&view=rev
Log:
HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup for append. Contributed by Hairong Kuang.
Added:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
- copied, changed from r818583, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java
Removed:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockNotFoundException.java
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Thu Sep 24 20:51:38 2009
@@ -35,6 +35,9 @@
HDFS-592. Allow clients to fetch a new generation stamp from NameNode for
pipeline recovery. (hairong)
+ HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup
+ for append. (hairong)
+
IMPROVEMENTS
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Sep 24 20:51:38 2009
@@ -36,8 +36,6 @@
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -2425,6 +2423,13 @@
long getLastByteOffsetBlock() {
return offsetInBlock + dataPos - dataStart;
}
+
+ public String toString() {
+ return "packet seqno:" + this.seqno +
+ " offsetInBlock:" + this.offsetInBlock +
+ " lastPacketInBlock:" + this.lastPacketInBlock +
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+ }
}
//
@@ -2436,8 +2441,6 @@
// if them are received, the DataStreamer closes the current block.
//
class DataStreamer extends Daemon {
- private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
- private int recoveryErrorCount = 0; // number of times block recovery failed
private volatile boolean streamerClosed = false;
private Block block; // its length is number of bytes acked
private AccessToken accessToken;
@@ -2446,7 +2449,7 @@
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
volatile boolean hasError = false;
- volatile int errorIndex = 0;
+ volatile int errorIndex = -1;
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
@@ -2511,7 +2514,17 @@
"of file " + src);
}
- processDatanodeError(true, true);
+ }
+
+ /**
+ * Initialize for data streaming
+ */
+ private void initDataStreaming() {
+ this.setName("DataStreamer for file " + src +
+ " block " + block);
+ response = new ResponseProcessor(nodes);
+ response.start();
+ stage = BlockConstructionStage.DATA_STREAMING;
}
/*
@@ -2533,38 +2546,41 @@
Packet one = null;
- // process IO errors if any
- boolean doSleep = processDatanodeError(hasError, false);
+ try {
+ // process datanode IO errors if any
+ boolean doSleep = false;
+ if (hasError && errorIndex>=0) {
+ doSleep = processDatanodeError();
+ }
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
- try {
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
+ synchronized (dataQueue) {
+ // wait for a packet to be sent.
+ while ((!streamerClosed && !hasError && clientRunning
+ && dataQueue.size() == 0) || doSleep) {
+ try {
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ doSleep = false;
}
- doSleep = false;
- }
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
- continue;
+ if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ continue;
+ }
+ // get packet to be sent.
+ one = dataQueue.getFirst();
}
- // get packet to be sent.
- one = dataQueue.getFirst();
- }
- try {
long offsetInBlock = one.offsetInBlock;
// get new block from namenode.
- if (blockStream == null) {
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block");
- nodes = nextBlockOutputStream(src);
- this.setName("DataStreamer for file " + src +
- " block " + block);
- response = new ResponseProcessor(nodes);
- response.start();
- stage = BlockConstructionStage.DATA_STREAMING;
+ nodes = nextBlockOutputStream(src);
+ initDataStreaming();
+ } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+ LOG.debug("Append to block " + block);
+ setupPipelineForAppendOrRecovery();
+ initDataStreaming();
}
if (offsetInBlock >= blockSize) {
@@ -2584,6 +2600,15 @@
dataQueue.notifyAll();
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DataStreamer block " + block +
+ " sending packet seqno:" + one.seqno +
+ " size:" + buf.remaining() +
+ " offsetInBlock:" + one.offsetInBlock +
+ " lastPacketInBlock:" + one.lastPacketInBlock +
+ " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
+ }
+
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
blockStream.flush();
@@ -2610,13 +2635,6 @@
blockStream.flush();
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("DataStreamer block " + block +
- " wrote packet seqno:" + one.seqno +
- " size:" + buf.remaining() +
- " offsetInBlock:" + one.offsetInBlock +
- " lastPacketInBlock:" + one.lastPacketInBlock);
- }
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
StringUtils.stringifyException(e));
@@ -2624,6 +2642,9 @@
setLastException((IOException)e);
}
hasError = true;
+ if (errorIndex == -1) { // not a datanode error
+ streamerClosed = true;
+ }
}
@@ -2751,10 +2772,20 @@
}
// processes response status from all datanodes.
+ String replies = null;
+ if (LOG.isDebugEnabled()) {
+ replies = "DFSClient Replies for seqno " + seqno + " are";
+ }
for (int i = 0; i < targets.length && clientRunning; i++) {
final DataTransferProtocol.Status reply
= DataTransferProtocol.Status.read(blockReplyStream);
+ if (LOG.isDebugEnabled()) {
+ replies += " " + reply;
+ }
if (reply != SUCCESS) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(replies);
+ }
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2763,11 +2794,18 @@
}
}
- if (one != null) {
- // update bytesAcked
- block.setNumBytes(one.getLastByteOffsetBlock());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(replies);
}
+ if (one == null) {
+ throw new IOException("Panic: responder did not receive " +
+ "an ack for a packet: " + seqno);
+ }
+
+ // update bytesAcked
+ block.setNumBytes(one.getLastByteOffsetBlock());
+
synchronized (dataQueue) {
ackQueue.removeFirst();
dataQueue.notifyAll();
@@ -2778,6 +2816,7 @@
setLastException((IOException)e);
}
hasError = true;
+ errorIndex = errorIndex==-1 ? 0 : errorIndex;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
@@ -2800,21 +2839,12 @@
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
- private boolean processDatanodeError(boolean error, boolean isAppend) {
- if (!error) {
- return false;
- }
+ private boolean processDatanodeError() throws IOException {
if (response != null) {
LOG.info("Error Recovery for block " + block +
" waiting for responder to exit. ");
return true;
}
- if (errorIndex >= 0) {
- LOG.warn("Error Recovery for block " + block
- + " bad datanode[" + errorIndex + "] "
- + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
- }
-
closeStream();
// move packets from ack queue to front of the data queue
@@ -2823,31 +2853,49 @@
ackQueue.clear();
}
+ boolean doSleep = setupPipelineForAppendOrRecovery();
+
+ if (!streamerClosed && clientRunning) {
+ initDataStreaming();
+ }
+
+ return doSleep;
+ }
+
+
+ /**
+ * Open a DataOutputStream to a DataNode pipeline so that
+ * it can be written to.
+ * This happens when a file is appended or data streaming fails
+ * It keeps on trying until a pipeline is setup
+ */
+ private boolean setupPipelineForAppendOrRecovery() throws IOException {
+ // check number of datanodes
+ if (nodes == null || nodes.length == 0) {
+ String msg = "Could not get block locations. " + "Source file \""
+ + src + "\" - Aborting...";
+ LOG.warn(msg);
+ setLastException(new IOException(msg));
+ streamerClosed = true;
+ return false;
+ }
+
boolean success = false;
+ long newGS = 0L;
while (!success && !streamerClosed && clientRunning) {
- DatanodeInfo[] newnodes = null;
- if (nodes == null) {
- String msg = "Could not get block locations. " + "Source file \""
- + src + "\" - Aborting...";
- LOG.warn(msg);
- setLastException(new IOException(msg));
- streamerClosed = true;
- return false;
- }
- StringBuilder pipelineMsg = new StringBuilder();
- for (int j = 0; j < nodes.length; j++) {
- pipelineMsg.append(nodes[j].getName());
- if (j < nodes.length - 1) {
- pipelineMsg.append(", ");
- }
- }
+ boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
// any datanodes
//
- if (errorIndex < 0) {
- newnodes = nodes;
- } else {
+ if (errorIndex >= 0) {
+ StringBuilder pipelineMsg = new StringBuilder();
+ for (int j = 0; j < nodes.length; j++) {
+ pipelineMsg.append(nodes[j].getName());
+ if (j < nodes.length - 1) {
+ pipelineMsg.append(", ");
+ }
+ }
if (nodes.length <= 1) {
lastException = new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting...");
@@ -2857,86 +2905,32 @@
LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
- newnodes = new DatanodeInfo[nodes.length-1];
+ DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
+ nodes = newnodes;
+ this.hasError = false;
+ lastException = null;
+ errorIndex = -1;
}
- // Tell the primary datanode to do error recovery
- // by stamping appropriate generation stamps.
- //
- LocatedBlock newBlock = null;
- ClientDatanodeProtocol primary = null;
- DatanodeInfo primaryNode = null;
- try {
- // Pick the "least" datanode as the primary datanode to avoid deadlock.
- primaryNode = Collections.min(Arrays.asList(newnodes));
- primary = createClientDatanodeProtocolProxy(primaryNode, conf);
- newBlock = primary.recoverBlock(block, isAppend, newnodes);
- } catch (IOException e) {
- recoveryErrorCount++;
- if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
- if (nodes.length > 1) {
- // if the primary datanode failed, remove it from the list.
- // The original bad datanode is left in the list because it is
- // conservative to remove only one datanode in one iteration.
- for (int j = 0; j < nodes.length; j++) {
- if (nodes[j].equals(primaryNode)) {
- errorIndex = j; // forget original bad node.
- }
- }
- // remove primary node from list
- newnodes = new DatanodeInfo[nodes.length-1];
- System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
- System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
- newnodes.length-errorIndex);
- nodes = newnodes;
- LOG.warn("Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg
- + ". Marking primary datanode as bad.");
- recoveryErrorCount = 0;
- errorIndex = -1;
- return true; // sleep when we return from here
- }
- String emsg = "Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg + ". Aborting...";
- LOG.warn(emsg);
- lastException = new IOException(emsg);
- streamerClosed = true;
- return false; // abort with IOexception
- }
- LOG.warn("Error Recovery for block " + block + " failed "
- + " because recovery from primary datanode " + primaryNode
- + " failed " + recoveryErrorCount + " times. "
- + " Pipeline was " + pipelineMsg + ". Will retry...");
- return true; // sleep when we return from here
- } finally {
- RPC.stopProxy(primary);
- }
- recoveryErrorCount = 0; // block recovery successful
-
- // If the block recovery generated a new generation stamp, use that
- // from now on. Also, setup new pipeline
- // newBlock should never be null and it should contain a newly
- // generated access token.
- block = newBlock.getBlock();
- accessToken = newBlock.getAccessToken();
- nodes = newBlock.getLocations();
-
- this.hasError = false;
- lastException = null;
- errorIndex = 0;
- success = createBlockOutputStream(nodes, clientName, true);
+ // get a new generation stamp and an access token
+ LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
+ newGS = lb.getBlock().getGenerationStamp();
+ accessToken = lb.getAccessToken();
+
+ // set up the pipeline again with the remaining nodes
+ success = createBlockOutputStream(nodes, newGS, isRecovery);
}
- if (!streamerClosed && clientRunning) {
- response = new ResponseProcessor(nodes);
- response.start();
+ if (success) {
+ // update pipeline at the namenode
+ Block newBlock = new Block(
+ block.getBlockId(), block.getNumBytes(), newGS);
+ namenode.updatePipeline(clientName, block, newBlock, nodes);
+ // update client side generation stamp
+ block = newBlock;
}
return false; // do not sleep, continue processing
}
@@ -2956,7 +2950,7 @@
do {
hasError = false;
lastException = null;
- errorIndex = 0;
+ errorIndex = -1;
retry = false;
success = false;
@@ -2970,7 +2964,7 @@
//
// Connect to first DataNode in the list.
//
- success = createBlockOutputStream(nodes, clientName, false);
+ success = createBlockOutputStream(nodes, 0L, false);
if (!success) {
LOG.info("Abandoning block " + block);
@@ -2998,7 +2992,7 @@
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
- private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
DataTransferProtocol.Status pipelineStatus = SUCCESS;
String firstBadLink = "";
@@ -3033,11 +3027,11 @@
DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
- // send the request: newGS now uses a dummy value 0 for now
+ // send the request
DataTransferProtocol.Sender.opWriteBlock(out,
block.getBlockId(), block.getGenerationStamp(),
- nodes.length, recoveryFlag?stage.getRecoveryStage():stage, 0,
- block.getNumBytes(), bytesSent, client, null, nodes, accessToken);
+ nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+ block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
checksum.writeHeader(out);
out.flush();
@@ -3070,6 +3064,8 @@
break;
}
}
+ } else {
+ errorIndex = 0;
}
hasError = true;
setLastException(ie);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Sep 24 20:51:38 2009
@@ -44,8 +44,9 @@
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 49: added a new method getNewStampForPipeline(Block, String)
- * to support pipeline recovery
+ * 49: added two new methods to support pipeline recovery and append
+ * updateBlockForPipeline(Block, String) and
+ * updatePipeline(String, Block, Block, DatanodeID[])
*/
public static final long versionID = 49L;
@@ -524,6 +525,19 @@
* @return a located block with a new generation stamp and an access token
* @throws IOException if any error occurs
*/
- public LocatedBlock getNewStampForPipeline(Block block, String clientName)
+ public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+ throws IOException;
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldBlock the old block
+ * @param newBlock the new block containing new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
+ */
+ public void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
throws IOException;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Sep 24 20:51:38 2009
@@ -56,7 +56,6 @@
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private Block block; // the block to receive
- protected boolean finalized;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
@@ -73,7 +72,6 @@
private Daemon responder = null;
private BlockTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
- private boolean isRecovery = false;
private String clientName;
DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
@@ -93,34 +91,41 @@
this.clientName = clientName;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
- this.finalized = datanode.data.isValidBlock(block);
//
// Open local disk out
//
if (clientName.length() == 0) { //replication or move
- replicaInfo = datanode.data.writeToTemporary(block);
+ replicaInfo = datanode.data.createTemporary(block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- isRecovery = false;
- replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+ replicaInfo = datanode.data.createRbw(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
- isRecovery = true;
- if (finalized) {
- replicaInfo = datanode.data.append(block);
- finalized = false;
+ if (datanode.data.isValidBlock(block)) {
+ // pipeline failed after the replica is finalized. This will be
+ // handled differently when pipeline close/recovery is introduced
+ replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd);
} else {
- replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+ replicaInfo = datanode.data.recoverRbw(
+ block, newGs, minBytesRcvd, maxBytesRcvd);
}
+ block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
+ replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+ if (datanode.blockScanner != null) { // remove from block scanner
+ datanode.blockScanner.deleteBlock(block);
+ }
+ block.setGenerationStamp(newGs);
+ break;
case PIPELINE_SETUP_APPEND_RECOVERY:
- isRecovery = true;
- replicaInfo = datanode.data.append(block);
- finalized = false;
+ replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+ if (datanode.blockScanner != null) { // remove from block scanner
+ datanode.blockScanner.deleteBlock(block);
+ }
+ block.setGenerationStamp(newGs);
break;
- case PIPELINE_CLOSE_RECOVERY:
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
@@ -131,20 +136,24 @@
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut,
SMALL_BUFFER_SIZE));
- // If this block is for appends, then remove it from periodic
- // validation.
- if (datanode.blockScanner != null && isRecovery) {
- datanode.blockScanner.deleteBlock(block);
- }
// read checksum meta information
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
+
+ // write data chunk header if creating a new replica
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
+ || clientName.length() == 0) {
+ BlockMetadataHeader.writeHeader(checksumOut, checksum);
+ } else {
+ datanode.data.setChannelPosition(block, streams, 0,
+ BlockMetadataHeader.getHeaderSize());
+ }
}
- } catch (BlockAlreadyExistsException bae) {
+ } catch (ReplicaAlreadyExistsException bae) {
throw bae;
- } catch (BlockNotFoundException bne) {
+ } catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
IOUtils.closeStream(this);
@@ -494,7 +503,7 @@
}
try {
- if (!finalized && replicaInfo.getBytesOnDisk()<offsetInBlock) {
+ if (replicaInfo.getBytesOnDisk()<offsetInBlock) {
//finally write to the disk :
setBlockPosition(offsetInBlock-len);
@@ -561,10 +570,6 @@
throttler = throttlerArg;
try {
- // write data chunk header
- if (!finalized) {
- BlockMetadataHeader.writeHeader(checksumOut, checksum);
- }
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
@@ -644,21 +649,6 @@
* Sets the file pointer in the local block file to the specified value.
*/
private void setBlockPosition(long offsetInBlock) throws IOException {
- if (finalized) {
- if (!isRecovery) {
- throw new IOException("Write to offset " + offsetInBlock +
- " of block " + block +
- " that is already finalized.");
- }
- if (offsetInBlock > datanode.data.getLength(block)) {
- throw new IOException("Write to offset " + offsetInBlock +
- " of block " + block +
- " that is already finalized and is of size " +
- datanode.data.getLength(block));
- }
- return;
- }
-
if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
return; // nothing to do
}
@@ -852,26 +842,24 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (pkt.lastPacketInBlock) {
- if (!receiver.finalized) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
+ receiver.close();
+ final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+ block.setNumBytes(replicaInfo.getNumBytes());
+ datanode.data.finalizeBlock(block);
+ datanode.myMetrics.blocksWritten.inc();
+ datanode.notifyNamenodeReceivedBlock(block,
+ DataNode.EMPTY_DEL_HINT);
+ if (ClientTraceLog.isInfoEnabled() &&
+ receiver.clientName.length() > 0) {
+ long offset = 0;
+ ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+ receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+ "HDFS_WRITE", receiver.clientName, offset,
+ datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+ } else {
+ LOG.info("Received block " + block +
+ " of size " + block.getNumBytes() +
+ " from " + receiver.inAddr);
}
lastPacket = true;
}
@@ -879,6 +867,9 @@
replyOut.writeLong(expected);
SUCCESS.write(replyOut);
replyOut.flush();
+ if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
} catch (Exception e) {
LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
if (running) {
@@ -911,6 +902,7 @@
}
boolean lastPacketInBlock = false;
+ Packet pkt = null;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
@@ -933,7 +925,6 @@
} else {
LOG.debug("PacketResponder " + numTargets + " got seqno = " +
seqno);
- Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -951,9 +942,6 @@
}
pkt = ackQueue.removeFirst();
expected = pkt.seqno;
- if (pkt.lastByteInBlock > replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
- }
notifyAll();
LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
if (seqno != expected) {
@@ -994,7 +982,7 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
- if (lastPacketInBlock && !receiver.finalized) {
+ if (lastPacketInBlock) {
receiver.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
@@ -1025,12 +1013,14 @@
" responded my status " +
" for seqno " + expected);
+ boolean success = true;
// forward responses from downstream datanodes.
for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
try {
if (op == SUCCESS) {
op = Status.read(mirrorIn);
if (op != SUCCESS) {
+ success = false;
LOG.debug("PacketResponder for block " + block +
": error code received from downstream " +
" datanode[" + i + "] " + op);
@@ -1038,6 +1028,7 @@
}
} catch (Throwable e) {
op = ERROR;
+ success = false;
}
op.write(replyOut);
}
@@ -1045,6 +1036,10 @@
LOG.debug("PacketResponder " + block + " " + numTargets +
" responded other status " + " for seqno " + expected);
+ if (pkt != null && success &&
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
// If we were unable to read the seqno from downstream, then stop.
if (expected == -2) {
running = false;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Sep 24 20:51:38 2009
@@ -1094,36 +1094,61 @@
}
@Override // FSDatasetInterface
- public ReplicaInPipelineInterface append(Block b)
- throws IOException {
+ public synchronized ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
// re-opens the connection and retries sending those packets.
// The other reason is that an "append" is occurring to this block.
- ReplicaInfo replicaInfo = volumeMap.get(b);
// check the validity of the parameter
+ if (newGS < b.getGenerationStamp()) {
+ throw new IOException("The new generation stamp " + newGS +
+ " should be greater than the replica " + b + "'s generation stamp");
+ }
+ ReplicaInfo replicaInfo = volumeMap.get(b);
if (replicaInfo == null) {
- throw new BlockNotFoundException(
- BlockNotFoundException.NON_EXISTENT_REPLICA + b);
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
}
+ DataNode.LOG.info("Appending to replica " + replicaInfo);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
- throw new BlockNotFoundException(
- BlockNotFoundException.UNFINALIZED_REPLICA + b);
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+ }
+ if (replicaInfo.getNumBytes() != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaInfo.getNumBytes() +
+ " expected length is " + expectedBlockLen);
}
-
- DataNode.LOG.info("Reopen Block for append " + b);
+ return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+ }
+
+ /** Append to a finalized replica
+ * Change a finalized replica to be a RBW replica and
+ * bump its generation stamp to be the newGS
+ *
+ * @param replicaInfo a finalized replica
+ * @param newGS new generation stamp
+ * @param estimateBlockLen estimate generation stamp
+ * @return a RBW replica
+ * @throws IOException if moving the replica from finalized directory
+ * to rbw directory fails
+ */
+ private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo,
+ long newGS, long estimateBlockLen) throws IOException {
// unlink the finalized replica
replicaInfo.detachBlock(1);
- // construct a RBW replica
+ // construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile();
- FSVolume v = volumes.getNextVolume(b.getNumBytes());
- File newBlkFile = v.createRbwFile(b);
+ FSVolume v = volumes.getNextVolume(estimateBlockLen);
+ File newBlkFile = v.createRbwFile(replicaInfo);
File oldmeta = replicaInfo.getMetaFile();
- ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(replicaInfo,
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+ replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
v, newBlkFile.getParentFile(), Thread.currentThread());
File newmeta = newReplicaInfo.getMetaFile();
@@ -1132,7 +1157,7 @@
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
if (!oldmeta.renameTo(newmeta)) {
- throw new IOException("Block " + b + " reopen failed. " +
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to rbw dir " + newmeta);
}
@@ -1140,13 +1165,14 @@
// rename block file to rbw directory
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+ DataNode.LOG.debug("Old block file length is " + blkfile.length());
}
if (!blkfile.renameTo(newBlkFile)) {
if (!newmeta.renameTo(oldmeta)) { // restore the meta file
DataNode.LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta);
}
- throw new IOException("Block " + b + " reopen failed. " +
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
" Unable to move block file " + blkfile +
" to rbw dir " + newBlkFile);
}
@@ -1157,47 +1183,165 @@
return newReplicaInfo;
}
- @Override
- public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
- throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b);
- ReplicaBeingWritten newReplicaInfo;
- if (replicaInfo == null) { // create a new block
- FSVolume v = volumes.getNextVolume(b.getNumBytes());
- // create a rbw file to hold block in the designated volume
- File f = v.createRbwFile(b);
- newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
- b.getGenerationStamp(), v, f.getParentFile());
- volumeMap.add(newReplicaInfo);
- } else {
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b +
- " already exists in state " + replicaInfo.getState() +
- " and thus cannot be created.");
- }
- if (replicaInfo.getState() != ReplicaState.RBW) {
- throw new BlockNotFoundException(
- BlockNotFoundException.NON_RBW_REPLICA + b);
- }
- newReplicaInfo = (ReplicaBeingWritten)replicaInfo;
- synchronized (this) {
- //
- // Is it already in the write process?
- //
- newReplicaInfo.stopWriter();
- newReplicaInfo.setWriter(Thread.currentThread());
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed append to " + b);
+
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+
+ // check generation stamp
+ long replicaGenerationStamp = replicaInfo.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+ + ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
+
+ // stop the previous writer before check a replica's length
+ long replicaLen = replicaInfo.getNumBytes();
+ if (replicaInfo.getState() == ReplicaState.RBW) {
+ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+ // kill the previous writer
+ rbw.stopWriter();
+ rbw.setWriter(Thread.currentThread());
+ // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
+ if (replicaLen != rbw.getBytesOnDisk()
+ || replicaLen != rbw.getBytesAcked()) {
+ throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
+ "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
+ rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
+ ") are not the same.");
}
}
+
+ // check block length
+ if (replicaLen != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaLen +
+ " expected length is " + expectedBlockLen);
+ }
+
+ // change the replica's state/gs etc.
+ switch (replicaInfo.getState()) {
+ case FINALIZED:
+ return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+ case RBW:
+ bumpReplicaGS(replicaInfo, newGS);
+ return (ReplicaBeingWritten)replicaInfo;
+ default:
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+ }
+ }
+
+ /**
+ * Bump a replica's generation stamp to a new one.
+ * Its on-disk meta file name is renamed to be the new one too.
+ *
+ * @param replicaInfo a replica
+ * @param newGS new generation stamp
+ * @throws IOException if rename fails
+ */
+ private void bumpReplicaGS(ReplicaInfo replicaInfo,
+ long newGS) throws IOException {
+ long oldGS = replicaInfo.getGenerationStamp();
+ File oldmeta = replicaInfo.getMetaFile();
+ replicaInfo.setGenerationStamp(newGS);
+ File newmeta = replicaInfo.getMetaFile();
+
+ // rename meta file to new GS
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ if (!oldmeta.renameTo(newmeta)) {
+ replicaInfo.setGenerationStamp(oldGS); // restore old GS
+ throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to " + newmeta);
+ }
+ }
+ @Override
+ public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo != null) {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
+ // create a new block
+ FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ // create a rbw file to hold block in the designated volume
+ File f = v.createRbwFile(b);
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile());
+ volumeMap.add(newReplicaInfo);
return newReplicaInfo;
}
@Override
- public ReplicaInPipelineInterface writeToTemporary(Block b)
+ public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b);
+ DataNode.LOG.info("Recover the RBW replica " + b);
+
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+
+ // check the replica's state
+ if (replicaInfo.getState() != ReplicaState.RBW) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
+ }
+ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+
+ DataNode.LOG.info("Recovering replica " + rbw);
+
+ // Stop the previous writer
+ rbw.stopWriter();
+ rbw.setWriter(Thread.currentThread());
+
+ // check generation stamp
+ long replicaGenerationStamp = rbw.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+ ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
+
+ // check replica length
+ if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+ throw new ReplicaNotFoundException("Unmatched length replica " +
+ replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() +
+ " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" +
+ minBytesRcvd + ", " + maxBytesRcvd + "].");
+ }
+
+ // bump the replica's generation stamp to newGS
+ bumpReplicaGS(rbw, newGS);
+
+ return rbw;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
if (replicaInfo != null) {
- throw new BlockAlreadyExistsException("Block " + b +
+ throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Sep 24 20:51:38 2009
@@ -180,35 +180,64 @@
}
/**
- * Creates a temporary replica and returns output streams to write data and CRC
+ * Creates a temporary replica and returns the meta information of the replica
*
* @param b block
- * @return the meata info of the replica which is being written to
+ * @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface writeToTemporary(Block b) throws IOException;
+ public ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException;
/**
- * Creates/recovers a RBW replica and returns output streams to
- * write data and CRC
+ * Creates a RBW replica and returns the meta info of the replica
*
* @param b block
- * @param isRecovery True if this is part of error recovery, otherwise false
- * @return the meata info of the replica which is being written to
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+
+ /**
+ * Recovers a RBW replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param minBytesRcvd the minimum number of bytes that the replica could have
+ * @param maxBytesRcvd the maximum number of bytes that the replica could have
+ * @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
+ public ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException;
/**
- * Append to a finalized replica and returns output streams to write data and CRC
+ * Append to a finalized replica and returns the meta info of the replica
+ *
* @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
* @return the meata info of the replica which is being written to
* @throws IOException
*/
- public ReplicaInPipelineInterface append(Block b) throws IOException;
+ public ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
/**
+ * Recover a failed append to a finalized replica
+ * and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @return the meta info of the replica which is being written to
+ * @throws IOException
+ */
+ public ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
+
+ /**
* Update the block to the new generation stamp and length.
*/
public void updateBlock(Block oldblock, Block newblock) throws IOException;
Copied: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java (from r818583, hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java)
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java?p2=hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java&p1=hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java&r1=818583&r2=818622&rev=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java Thu Sep 24 20:51:38 2009
@@ -25,14 +25,14 @@
* Exception indicating that the target block already exists
* and is not set to be recovered/overwritten.
*/
-class BlockAlreadyExistsException extends IOException {
+class ReplicaAlreadyExistsException extends IOException {
private static final long serialVersionUID = 1L;
- public BlockAlreadyExistsException() {
+ public ReplicaAlreadyExistsException() {
super();
}
- public BlockAlreadyExistsException(String msg) {
+ public ReplicaAlreadyExistsException(String msg) {
super(msg);
}
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Thu Sep 24 20:51:38 2009
@@ -15,6 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
@@ -22,12 +24,30 @@
import org.apache.hadoop.hdfs.protocol.Block;
/**
- * Exception indicating that a replica is not found.
+ * Exception indicating that DataNode does not have a replica
+ * that matches the target block.
*/
class ReplicaNotFoundException extends IOException {
private static final long serialVersionUID = 1L;
+ final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
+ final static String UNFINALIZED_REPLICA =
+ "Cannot append to an unfinalized replica ";
+ final static String UNFINALIZED_AND_NONRBW_REPLICA =
+ "Cannot recover appending to a replica that's not FINALIZED and not RBW ";
+ final static String NON_EXISTENT_REPLICA =
+ "Cannot append to a non-existent replica ";
+ final static String UNEXPECTED_GS_REPLICA =
+ "Cannot append to a replica with unexpeted generation stamp ";
+
+ public ReplicaNotFoundException() {
+ super();
+ }
ReplicaNotFoundException(Block b) {
super("Replica not found for " + b);
}
-}
\ No newline at end of file
+
+ public ReplicaNotFoundException(String msg) {
+ super(msg);
+ }
+}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Sep 24 20:51:38 2009
@@ -3753,24 +3753,13 @@
return nextGenerationStamp();
}
- /**
- * Get a new generation stamp together with an access token for
- * a block under construction
- *
- * This method is called for recovering a failed pipeline or setting up
- * a pipeline to append to a block.
- *
- * @param block a block
- * @param clientName the name of a client
- * @return a located block with a new generation stamp and an access token
- * @throws IOException if any error occurs
- */
- synchronized LocatedBlock getNewStampForPipeline(Block block, String clientName)
+
+ private INodeFileUnderConstruction checkUCBlock(Block block, String clientName)
throws IOException {
// check safe mode
if (isInSafeMode())
throw new SafeModeException("Cannot get a new generation stamp and an " +
- "access token for block " + block, safeMode);
+ "access token for block " + block, safeMode);
// check stored block state
BlockInfo storedBlock = blockManager.getStoredBlock(block);
@@ -3794,6 +3783,26 @@
" is accessed by a non lease holder " + clientName);
}
+ return pendingFile;
+ }
+
+ /**
+ * Get a new generation stamp together with an access token for
+ * a block under construction
+ *
+ * This method is called for recovering a failed pipeline or setting up
+ * a pipeline to append to a block.
+ *
+ * @param block a block
+ * @param clientName the name of a client
+ * @return a located block with a new generation stamp and an access token
+ * @throws IOException if any error occurs
+ */
+ synchronized LocatedBlock updateBlockForPipeline(Block block,
+ String clientName) throws IOException {
+ // check vadility of parameters
+ checkUCBlock(block, clientName);
+
// get a new generation stamp and an access token
block.setGenerationStamp(nextGenerationStamp());
LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
@@ -3804,6 +3813,72 @@
return locatedBlock;
}
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldblock and old block
+ * @param newBlock a new block with a new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
+ */
+ synchronized void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ LOG.info("updatePipeline(block=" + oldBlock
+ + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ + ", newLength=" + newBlock.getNumBytes()
+ + ", newNodes=" + Arrays.asList(newNodes)
+ + ", clientName=" + clientName
+ + ")");
+
+ // check the vadility of the block and lease holder name
+ final INodeFileUnderConstruction pendingFile =
+ checkUCBlock(oldBlock, clientName);
+ final BlockInfo oldblockinfo = pendingFile.getLastBlock();
+
+ // check new GS & length: this is not expected
+ if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
+ newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
+ String msg = "Update " + oldBlock + " (len = " +
+ oldblockinfo.getNumBytes() + ") to an older state: " + newBlock +
+ " (len = " + newBlock.getNumBytes() +")";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+
+ // Remove old block from blocks map. This always have to be done
+ // because the generation stamp of this block is changing.
+ blockManager.removeBlockFromMap(oldblockinfo);
+
+ // update last block, construct newblockinfo and add it to the blocks map
+ BlockInfoUnderConstruction newblockinfo =
+ new BlockInfoUnderConstruction(
+ newBlock, pendingFile.getReplication());
+ blockManager.addINode(newblockinfo, pendingFile);
+
+ // find the DatanodeDescriptor objects
+ DatanodeDescriptor[] descriptors = null;
+ if (newNodes.length > 0) {
+ descriptors = new DatanodeDescriptor[newNodes.length];
+ for(int i = 0; i < newNodes.length; i++) {
+ descriptors[i] = getDatanode(newNodes[i]);
+ }
+ }
+ // add locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(newblockinfo, descriptors);
+
+ // persist blocks only if append is supported
+ String src = leaseManager.findPath(pendingFile);
+ if (supportAppends) {
+ dir.persistBlocks(src, pendingFile);
+ getEditLog().logSync();
+ }
+ LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+ return;
+ }
+
// rename was successful. If any part of the renamed subtree had
// files that were being written to, update with new filename.
//
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Sep 24 20:51:38 2009
@@ -672,11 +672,19 @@
}
@Override
- public LocatedBlock getNewStampForPipeline(Block block, String clientName)
+ public LocatedBlock updateBlockForPipeline(Block block, String clientName)
throws IOException {
- return namesystem.getNewStampForPipeline(block, clientName);
+ return namesystem.updateBlockForPipeline(block, clientName);
}
+
+ @Override
+ public void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+ }
+
/** {@inheritDoc} */
public void commitBlockSynchronization(Block block,
long newgenerationstamp, long newlength,
Modified: hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Thu Sep 24 20:51:38 2009
@@ -35,7 +35,7 @@
before(DataStreamer datastreamer) : callCreateBlockOutputStream(datastreamer) {
Assert.assertFalse(datastreamer.hasError);
- Assert.assertEquals(0, datastreamer.errorIndex);
+ Assert.assertEquals(-1, datastreamer.errorIndex);
}
pointcut pipelineInitNonAppend(DataStreamer datastreamer):
@@ -66,14 +66,12 @@
+ " errorIndex=" + datastreamer.errorIndex);
}
- pointcut pipelineErrorAfterInit(boolean onError, boolean isAppend,
- DataStreamer datastreamer):
- call(* processDatanodeError(boolean, boolean))
- && args(onError, isAppend)
- && target(datastreamer)
- && if(onError && !isAppend);
+ pointcut pipelineErrorAfterInit(DataStreamer datastreamer):
+ call(* processDatanodeError())
+ && within (DFSClient.DFSOutputStream.DataStreamer)
+ && target(datastreamer);
- before(DataStreamer datastreamer) : pipelineErrorAfterInit(boolean, boolean, datastreamer) {
+ before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
+ datastreamer.errorIndex);
try {
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Thu Sep 24 20:51:38 2009
@@ -55,7 +55,7 @@
// test getNewStampAndToken on a finalized block
try {
- namenode.getNewStampForPipeline(firstBlock, "");
+ namenode.updateBlockForPipeline(firstBlock, "");
Assert.fail("Can not get a new GS from a finalized block");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("is not under Construction"));
@@ -66,7 +66,7 @@
long newBlockId = firstBlock.getBlockId() + 1;
Block newBlock = new Block(newBlockId, 0,
firstBlock.getGenerationStamp());
- namenode.getNewStampForPipeline(newBlock, "");
+ namenode.updateBlockForPipeline(newBlock, "");
Assert.fail("Cannot get a new GS from a non-existent block");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("does not exist"));
@@ -92,7 +92,7 @@
// test non-lease holder
DFSClient dfs = ((DistributedFileSystem)fileSys).dfs;
try {
- namenode.getNewStampForPipeline(firstBlock, "test" + dfs.clientName);
+ namenode.updateBlockForPipeline(firstBlock, "test" + dfs.clientName);
Assert.fail("Cannot get a new GS for a non lease holder");
} catch (LeaseExpiredException e) {
Assert.assertTrue(e.getMessage().startsWith("Lease mismatch"));
@@ -100,14 +100,14 @@
// test null lease holder
try {
- namenode.getNewStampForPipeline(firstBlock, null);
+ namenode.updateBlockForPipeline(firstBlock, null);
Assert.fail("Cannot get a new GS for a null lease holder");
} catch (LeaseExpiredException e) {
Assert.assertTrue(e.getMessage().startsWith("Lease mismatch"));
}
// test getNewStampAndToken on a rbw block
- namenode.getNewStampForPipeline(firstBlock, dfs.clientName);
+ namenode.updateBlockForPipeline(firstBlock, dfs.clientName);
} finally {
IOUtils.closeStream(out);
}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Thu Sep 24 20:51:38 2009
@@ -215,9 +215,12 @@
public void setTimes(String src, long mtime, long atime) throws IOException {}
- public LocatedBlock getNewStampForPipeline(Block block, String clientName)
- throws IOException { return null; }
+ @Override public LocatedBlock updateBlockForPipeline(Block block,
+ String clientName) throws IOException { return null; }
+ @Override public void updatePipeline(String clientName, Block oldblock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {}
}
public void testNotYetReplicatedErrors() throws IOException
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Sep 24 20:51:38 2009
@@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -165,13 +166,13 @@
sendRecvData(description, false);
}
- private void testWrite(Block block, BlockConstructionStage stage,
+ private void testWrite(Block block, BlockConstructionStage stage, long newGS,
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
DataTransferProtocol.Sender.opWriteBlock(sendOut,
block.getBlockId(), block.getGenerationStamp(), 0,
- stage, 0L, 0L, 0L, "cl", null,
+ stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
if (eofExcepted) {
ERROR.write(recvOut);
@@ -198,23 +199,29 @@
// get the first blockid for the file
Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
// test PIPELINE_SETUP_CREATE on a finalized block
- testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
"Cannot create an existing block", true);
// test PIPELINE_DATA_STREAMING on a finalized block
- testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING,
+ testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING, 0L,
"Unexpected stage", true);
// test PIPELINE_SETUP_STREAMING_RECOVERY on an existing block
+ long newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
- BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
- "Successful for now", false);
+ BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+ newGS, "Successful for now", false);
+ firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_APPEND on an existing block
+ newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_APPEND,
- "Append to a finalized replica", false);
- // test PIPELINE_SETUP_APPEND on an existing block
+ newGS, "Append to a finalized replica", false);
+ firstBlock.setGenerationStamp(newGS);
+ // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
+ newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
- BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
"Recover appending to a finalized replica", false);
+ firstBlock.setGenerationStamp(newGS);
/* Test writing to a new block */
long newBlockId = firstBlock.getBlockId() + 1;
@@ -222,48 +229,58 @@
firstBlock.getGenerationStamp());
// test PIPELINE_SETUP_CREATE on a new block
- testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
"Create a new block", false);
// test PIPELINE_SETUP_STREAMING_RECOVERY on a new block
+ newGS = newBlock.getGenerationStamp() + 1;
newBlock.setBlockId(newBlock.getBlockId()+1);
testWrite(newBlock,
- BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
- "Recover a new block", false);
+ BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS,
+ "Recover a new block", true);
+
// test PIPELINE_SETUP_APPEND on a new block
- newBlock.setBlockId(newBlock.getBlockId()+1);
+ newGS = newBlock.getGenerationStamp() + 1;
testWrite(newBlock,
- BlockConstructionStage.PIPELINE_SETUP_APPEND,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS,
"Cannot append to a new block", true);
+
// test PIPELINE_SETUP_APPEND_RECOVERY on a new block
newBlock.setBlockId(newBlock.getBlockId()+1);
+ newGS = newBlock.getGenerationStamp() + 1;
testWrite(newBlock,
- BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
"Cannot append to a new block", true);
/* Test writing to RBW replicas */
- // change first block to a RBW
- DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
+ Path file1 = new Path("dataprotocol1.dat");
+ DFSTestUtil.createFile(fileSys, file1, 1L, (short)numDataNodes, 0L);
+ DFSOutputStream out = (DFSOutputStream)(fileSys.append(file1).
getWrappedStream());
out.write(1);
out.hflush();
- FSDataInputStream in = fileSys.open(file);
+ FSDataInputStream in = fileSys.open(file1);
firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+ firstBlock.setNumBytes(2L);
try {
// test PIPELINE_SETUP_CREATE on a RBW block
- testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
"Cannot create a RBW block", true);
// test PIPELINE_SETUP_APPEND on an existing block
+ newGS = newBlock.getGenerationStamp() + 1;
testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
- "Cannot append to a RBW replica", true);
+ newGS, "Cannot append to a RBW replica", true);
// test PIPELINE_SETUP_APPEND on an existing block
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
- "Cannot append to a RBW replica", true);
+ newGS, "Recover append to a RBW replica", false);
+ firstBlock.setGenerationStamp(newGS);
// test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+ newGS = firstBlock.getGenerationStamp() + 1;
testWrite(firstBlock,
BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
- "Recover a RBW replica", false);
+ newGS, "Recover a RBW replica", false);
+ firstBlock.setGenerationStamp(newGS);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Thu Sep 24 20:51:38 2009
@@ -37,7 +37,7 @@
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 extends junit.framework.TestCase {
- static final long BLOCK_SIZE = 64 * 1024;
+ static final long BLOCK_SIZE = 3 * 64 * 1024;
static final short REPLICATION = 3;
static final int DATANODE_NUM = 5;
@@ -220,6 +220,7 @@
FSDataOutputStream out = fs.append(p);
final int len2 = (int)BLOCK_SIZE/2;
AppendTestUtil.write(out, len1, len2);
+ out.sync();
//c. Rename file to file.new.
final Path pnew = new Path(p + ".new");
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Sep 24 20:51:38 2009
@@ -467,10 +467,11 @@
}
@Override
- public ReplicaInPipelineInterface append(Block b) throws IOException {
+ public synchronized ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
BInfo binfo = blockMap.get(b);
if (binfo == null || !binfo.isFinalized()) {
- throw new BlockNotFoundException("Block " + b
+ throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
}
binfo.unfinalizeBlock();
@@ -478,30 +479,51 @@
}
@Override
- public synchronized ReplicaInPipelineInterface writeToRbw(Block b,
- boolean isRecovery) throws IOException {
- if (isValidBlock(b)) {
- throw new BlockAlreadyExistsException("Block " + b
- + " is valid, and cannot be written to.");
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null || !binfo.isFinalized()) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " is not valid, and cannot be appended to.");
}
+ if (binfo.isFinalized()) {
+ binfo.unfinalizeBlock();
+ }
+ binfo.theBlock.setGenerationStamp(newGS);
+ return binfo;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
BInfo binfo = blockMap.get(b);
- if (isRecovery && binfo != null) {
- return binfo;
+ if ( binfo == null) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " does not exist, and cannot be appended to.");
}
- binfo = new BInfo(b, true);
- blockMap.put(binfo.theBlock, binfo);
+ if (binfo.isFinalized()) {
+ throw new ReplicaAlreadyExistsException("Block " + b
+ + " is valid, and cannot be written to.");
+ }
+ binfo.theBlock.setGenerationStamp(newGS);
return binfo;
}
@Override
- public synchronized ReplicaInPipelineInterface writeToTemporary(Block b)
+ public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ throws IOException {
+ return createTemporary(b);
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface createTemporary(Block b)
throws IOException {
if (isValidBlock(b)) {
- throw new BlockAlreadyExistsException("Block " + b +
+ throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");
}
if (isBeingWritten(b)) {
- throw new BlockAlreadyExistsException("Block " + b +
+ throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
BInfo binfo = new BInfo(b, true);
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=818622&r1=818621&r2=818622&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Thu Sep 24 20:51:38 2009
@@ -64,7 +64,7 @@
int bytesAdded = 0;
for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
- ReplicaInPipelineInterface bInfo = fsdataset.writeToRbw(b, false);
+ ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
BlockWriteStreams out = bInfo.createStreams();
try {
OutputStream dataOut = out.dataOut;