You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 00:57:34 UTC
svn commit: r820487 [2/6] - in /hadoop/hdfs/branches/branch-0.21: ./
.eclipse.templates/.launches/ lib/ src/contrib/block_forensics/
src/contrib/block_forensics/client/ src/contrib/block_forensics/ivy/
src/contrib/block_forensics/src/ src/contrib/block...
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Sep 30 22:57:30 2009
@@ -25,6 +25,7 @@
import java.io.OutputStream;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.AccessToken;
/**
@@ -38,12 +39,12 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 16:
- * Datanode now needs to send back a status code together
- * with firstBadLink during pipeline setup for dfs write
- * (only for DFSClients, not for other datanodes).
+ * Version 17:
+ * Change the block write protocol to support pipeline recovery.
+ * Additional fields, like recovery flags, new GS, minBytesRcvd,
+ * and maxBytesRcvd are included.
*/
- public static final int DATA_TRANSFER_VERSION = 16;
+ public static final int DATA_TRANSFER_VERSION = 17;
/** Operation */
public enum Op {
@@ -119,6 +120,55 @@
}
};
+ public enum BlockConstructionStage {
+ /** The enumerates are always listed as regular stage followed by the
+ * recovery stage.
+ * Changing this order will make getRecoveryStage not working.
+ */
+ // pipeline set up for block append
+ PIPELINE_SETUP_APPEND,
+ // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+ PIPELINE_SETUP_APPEND_RECOVERY,
+ // data streaming
+ DATA_STREAMING,
+ // pipeline setup for failed data streaming recovery
+ PIPELINE_SETUP_STREAMING_RECOVERY,
+ // close the block and pipeline
+ PIPELINE_CLOSE,
+ // Recover a failed PIPELINE_CLOSE
+ PIPELINE_CLOSE_RECOVERY,
+ // pipeline set up for block creation
+ PIPELINE_SETUP_CREATE;
+
+ final static private byte RECOVERY_BIT = (byte)1;
+
+ /**
+ * get the recovery stage of this stage
+ */
+ public BlockConstructionStage getRecoveryStage() {
+ if (this == PIPELINE_SETUP_CREATE) {
+ throw new IllegalArgumentException( "Unexpected blockStage " + this);
+ } else {
+ return values()[ordinal()|RECOVERY_BIT];
+ }
+ }
+
+ private static BlockConstructionStage valueOf(byte code) {
+ return code < 0 || code >= values().length? null: values()[code];
+ }
+
+ /** Read from in */
+ private static BlockConstructionStage readFields(DataInput in)
+ throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** write to out */
+ private void write(DataOutput out) throws IOException {
+ out.writeByte(ordinal());
+ }
+ }
+
/** @deprecated Deprecated at 0.21. Use Op.WRITE_BLOCK instead. */
@Deprecated
public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
@@ -187,15 +237,19 @@
/** Send OP_WRITE_BLOCK */
public static void opWriteBlock(DataOutputStream out,
- long blockId, long blockGs, int pipelineSize, boolean isRecovery,
- String client, DatanodeInfo src, DatanodeInfo[] targets,
- AccessToken accesstoken) throws IOException {
+ long blockId, long blockGs, int pipelineSize,
+ BlockConstructionStage stage, long newGs, long minBytesRcvd,
+ long maxBytesRcvd, String client, DatanodeInfo src,
+ DatanodeInfo[] targets, AccessToken accesstoken) throws IOException {
op(out, Op.WRITE_BLOCK);
out.writeLong(blockId);
out.writeLong(blockGs);
out.writeInt(pipelineSize);
- out.writeBoolean(isRecovery);
+ stage.write(out);
+ WritableUtils.writeVLong(out, newGs);
+ WritableUtils.writeVLong(out, minBytesRcvd);
+ WritableUtils.writeVLong(out, maxBytesRcvd);
Text.writeString(out, client);
out.writeBoolean(src != null);
@@ -307,7 +361,11 @@
final long blockId = in.readLong();
final long blockGs = in.readLong();
final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
- final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+ final BlockConstructionStage stage =
+ BlockConstructionStage.readFields(in);
+ final long newGs = WritableUtils.readVLong(in);
+ final long minBytesRcvd = WritableUtils.readVLong(in);
+ final long maxBytesRcvd = WritableUtils.readVLong(in);
final String client = Text.readString(in); // working on behalf of this client
final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
@@ -321,8 +379,8 @@
}
final AccessToken accesstoken = readAccessToken(in);
- opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
- client, src, targets, accesstoken);
+ opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
+ newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
}
/**
@@ -330,7 +388,9 @@
* Write a block.
*/
protected abstract void opWriteBlock(DataInputStream in,
- long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+ long blockId, long blockGs,
+ int pipelineSize, BlockConstructionStage stage,
+ long newGs, long minBytesRcvd, long maxBytesRcvd,
String client, DatanodeInfo src, DatanodeInfo[] targets,
AccessToken accesstoken) throws IOException;
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Wed Sep 30 22:57:30 2009
@@ -90,7 +90,9 @@
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -19;
+ public static final int LAYOUT_VERSION = -20;
// Current version:
- // -19: Sticky bit
+ // -20: DataNode adds a "rbw" sub directory to data directory
+ // current dir contains "finalized" subdir for finalized replicas
+ // and "rbw" subdir for replicas being written to.
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Sep 30 22:57:30 2009
@@ -145,4 +145,21 @@
locs[i].readFields(in);
}
}
+
+ /** Read LocatedBlock from in. */
+ public static LocatedBlock read(DataInput in) throws IOException {
+ final LocatedBlock lb = new LocatedBlock();
+ lb.readFields(in);
+ return lb;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return getClass().getSimpleName() + "{" + b
+ + "; getBlockSize()=" + getBlockSize()
+ + "; corrupt=" + corrupt
+ + "; offset=" + offset
+ + "; locs=" + java.util.Arrays.asList(locs)
+ + "}";
+ }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Wed Sep 30 22:57:30 2009
@@ -36,6 +36,8 @@
private long fileLength;
private List<LocatedBlock> blocks; // array of blocks with prioritized locations
private boolean underConstruction;
+ private LocatedBlock lastLocatedBlock = null;
+ private boolean isLastBlockComplete = false;
LocatedBlocks() {
fileLength = 0;
@@ -43,11 +45,15 @@
underConstruction = false;
}
- public LocatedBlocks(long flength, List<LocatedBlock> blks, boolean isUnderConstuction) {
-
+ /** public Constructor */
+ public LocatedBlocks(long flength, boolean isUnderConstuction,
+ List<LocatedBlock> blks,
+ LocatedBlock lastBlock, boolean isLastBlockCompleted) {
fileLength = flength;
blocks = blks;
underConstruction = isUnderConstuction;
+ this.lastLocatedBlock = lastBlock;
+ this.isLastBlockComplete = isLastBlockCompleted;
}
/**
@@ -57,6 +63,16 @@
return blocks;
}
+ /** Get the last located block. */
+ public LocatedBlock getLastLocatedBlock() {
+ return lastLocatedBlock;
+ }
+
+ /** Is the last block completed? */
+ public boolean isLastBlockComplete() {
+ return isLastBlockComplete;
+ }
+
/**
* Get located block.
*/
@@ -161,6 +177,15 @@
public void write(DataOutput out) throws IOException {
out.writeLong(this.fileLength);
out.writeBoolean(underConstruction);
+
+ //write the last located block
+ final boolean isNull = lastLocatedBlock == null;
+ out.writeBoolean(isNull);
+ if (!isNull) {
+ lastLocatedBlock.write(out);
+ }
+ out.writeBoolean(isLastBlockComplete);
+
// write located blocks
int nrBlocks = locatedBlockCount();
out.writeInt(nrBlocks);
@@ -175,6 +200,14 @@
public void readFields(DataInput in) throws IOException {
this.fileLength = in.readLong();
underConstruction = in.readBoolean();
+
+ //read the last located block
+ final boolean isNull = in.readBoolean();
+ if (!isNull) {
+ lastLocatedBlock = LocatedBlock.read(in);
+ }
+ isLastBlockComplete = in.readBoolean();
+
// read located blocks
int nrBlocks = in.readInt();
this.blocks = new ArrayList<LocatedBlock>(nrBlocks);
@@ -184,4 +217,18 @@
this.blocks.add(blk);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+ b.append("{")
+ .append("\n fileLength=").append(fileLength)
+ .append("\n underConstruction=").append(underConstruction)
+ .append("\n blocks=").append(blocks)
+ .append("\n lastLocatedBlock=").append(lastLocatedBlock)
+ .append("\n isLastBlockComplete=").append(isLastBlockComplete)
+ .append("}");
+ return b.toString();
+ }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Wed Sep 30 22:57:30 2009
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
/************************************
* Some handy internal HDFS constants
*
@@ -80,5 +84,77 @@
return description;
}
}
+
+ /**
+ * Block replica states, which it can go through while being constructed.
+ */
+ static public enum ReplicaState {
+ /** Replica is finalized. The state when replica is not modified. */
+ FINALIZED(0),
+ /** Replica is being written to. */
+ RBW(1),
+ /** Replica is waiting to be recovered. */
+ RWR(2),
+ /** Replica is under recovery. */
+ RUR(3),
+ /** Temporary replica: created for replication and relocation only. */
+ TEMPORARY(4);
+
+ private int value;
+
+ private ReplicaState(int v) {
+ value = v;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static ReplicaState getState(int v) {
+ return ReplicaState.values()[v];
+ }
+
+ /** Read from in */
+ public static ReplicaState read(DataInput in) throws IOException {
+ return values()[in.readByte()];
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(ordinal());
+ }
+ }
+
+ /**
+ * States, which a block can go through while it is under construction.
+ */
+ static public enum BlockUCState {
+ /**
+ * Block construction completed.<br>
+ * The block has at least one {@link ReplicaState#FINALIZED} replica,
+ * and is not going to be modified.
+ */
+ COMPLETE,
+ /**
+ * The block is under construction.<br>
+ * It has been recently allocated for write or append.
+ */
+ UNDER_CONSTRUCTION,
+ /**
+ * The block is under recovery.<br>
+ * When a file lease expires its last block may not be {@link #COMPLETE}
+ * and needs to go through a recovery procedure,
+ * which synchronizes the existing replicas contents.
+ */
+ UNDER_RECOVERY,
+ /**
+ * The block is committed.<br>
+ * The client reported that all bytes are written to data-nodes
+ * with the given generation stamp and block length, but no
+ * {@link ReplicaState#FINALIZED}
+ * replicas has yet been reported by data-nodes themselves.
+ */
+ COMMITTED;
+ }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Sep 30 22:57:30 2009
@@ -74,6 +74,9 @@
* any upgrade code that uses this constant should also be removed. */
public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
+ // last layout version that did not support persistent rbw replicas
+ public static final int PRE_RBW_LAYOUT_VERSION = -19;
+
private static final String STORAGE_FILE_LOCK = "in_use.lock";
protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Sep 30 22:57:30 2009
@@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -55,7 +56,6 @@
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private Block block; // the block to receive
- protected boolean finalized;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
@@ -65,7 +65,6 @@
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf
private int maxPacketReadLen;
- protected long offsetInBlock;
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
@@ -73,46 +72,83 @@
private Daemon responder = null;
private BlockTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
- private boolean isRecovery = false;
private String clientName;
DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
+ final private ReplicaInPipelineInterface replicaInfo;
BlockReceiver(Block block, DataInputStream in, String inAddr,
- String myAddr, boolean isRecovery, String clientName,
- DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+ String myAddr, BlockConstructionStage stage,
+ long newGs, long minBytesRcvd, long maxBytesRcvd,
+ String clientName, DatanodeInfo srcDataNode, DataNode datanode)
+ throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
- this.isRecovery = isRecovery;
this.clientName = clientName;
- this.offsetInBlock = 0;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
- this.checksum = DataChecksum.newDataChecksum(in);
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
- this.checksumSize = checksum.getChecksumSize();
//
// Open local disk out
//
- streams = datanode.data.writeToBlock(block, isRecovery);
- this.finalized = datanode.data.isValidBlock(block);
+ if (clientName.length() == 0) { //replication or move
+ replicaInfo = datanode.data.createTemporary(block);
+ } else {
+ switch (stage) {
+ case PIPELINE_SETUP_CREATE:
+ replicaInfo = datanode.data.createRbw(block);
+ break;
+ case PIPELINE_SETUP_STREAMING_RECOVERY:
+ replicaInfo = datanode.data.recoverRbw(
+ block, newGs, minBytesRcvd, maxBytesRcvd);
+ block.setGenerationStamp(newGs);
+ break;
+ case PIPELINE_SETUP_APPEND:
+ replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+ if (datanode.blockScanner != null) { // remove from block scanner
+ datanode.blockScanner.deleteBlock(block);
+ }
+ block.setGenerationStamp(newGs);
+ break;
+ case PIPELINE_SETUP_APPEND_RECOVERY:
+ replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+ if (datanode.blockScanner != null) { // remove from block scanner
+ datanode.blockScanner.deleteBlock(block);
+ }
+ block.setGenerationStamp(newGs);
+ break;
+ default: throw new IOException("Unsupported stage " + stage +
+ " while receiving block " + block + " from " + inAddr);
+ }
+ }
+ streams = replicaInfo.createStreams();
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut,
SMALL_BUFFER_SIZE));
- // If this block is for appends, then remove it from periodic
- // validation.
- if (datanode.blockScanner != null && isRecovery) {
- datanode.blockScanner.deleteBlock(block);
+
+ // read checksum meta information
+ this.checksum = DataChecksum.newDataChecksum(in);
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ this.checksumSize = checksum.getChecksumSize();
+
+ // write data chunk header if creating a new replica
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
+ || clientName.length() == 0) {
+ BlockMetadataHeader.writeHeader(checksumOut, checksum);
+ } else {
+ datanode.data.setChannelPosition(block, streams, 0,
+ BlockMetadataHeader.getHeaderSize());
}
}
- } catch (BlockAlreadyExistsException bae) {
+ } catch (ReplicaAlreadyExistsException bae) {
throw bae;
+ } catch (ReplicaNotFoundException bne) {
+ throw bne;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
@@ -288,7 +324,7 @@
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
- private int readNextPacket() throws IOException {
+ private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* for next packet at the same time.
@@ -324,12 +360,6 @@
int payloadLen = buf.getInt();
buf.reset();
- if (payloadLen == 0) {
- //end of stream!
- buf.limit(buf.position() + SIZE_OF_INTEGER);
- return 0;
- }
-
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
@@ -369,42 +399,58 @@
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
-
- return payloadLen;
}
/**
* Receives and processes a packet. It can contain many chunks.
- * returns size of the packet.
+ * returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
-
- int payloadLen = readNextPacket();
-
- if (payloadLen <= 0) {
- return payloadLen;
- }
+ // read the next packet
+ readNextPacket();
buf.mark();
//read the header
buf.getInt(); // packet length
- offsetInBlock = buf.getLong(); // get offset of packet in block
+ long offsetInBlock = buf.getLong(); // get offset of packet in block
+
+ if (offsetInBlock > replicaInfo.getNumBytes()) {
+ throw new IOException("Received an out-of-sequence packet for " + block +
+ "from " + inAddr + " at offset " + offsetInBlock +
+ ". Expecting packet starting at " + replicaInfo.getNumBytes());
+ }
long seqno = buf.getLong(); // get seqno
boolean lastPacketInBlock = (buf.get() != 0);
+ int len = buf.getInt();
+ if (len < 0) {
+ throw new IOException("Got wrong length during writeBlock(" + block +
+ ") from " + inAddr + " at offset " +
+ offsetInBlock + ": " + len);
+ }
int endOfHeader = buf.position();
buf.reset();
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
- " of length " + payloadLen +
+ " of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
- setBlockPosition(offsetInBlock);
+ // update received bytes
+ offsetInBlock += len;
+ if (replicaInfo.getNumBytes() < offsetInBlock) {
+ replicaInfo.setNumBytes(offsetInBlock);
+ }
+ // put in queue for pending acks
+ if (responder != null) {
+ ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+ lastPacketInBlock, offsetInBlock);
+ }
+
//First write the packet to the mirror:
if (mirrorOut != null) {
try {
@@ -416,19 +462,10 @@
}
buf.position(endOfHeader);
- int len = buf.getInt();
- if (len < 0) {
- throw new IOException("Got wrong length during writeBlock(" + block +
- ") from " + inAddr + " at offset " +
- offsetInBlock + ": " + len);
- }
-
- if (len == 0) {
- LOG.debug("Receiving empty packet for block " + block);
+ if (lastPacketInBlock || len == 0) {
+ LOG.debug("Receiving an empty packet or the end of the block " + block);
} else {
- offsetInBlock += len;
-
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -454,8 +491,10 @@
}
try {
- if (!finalized) {
+ if (replicaInfo.getBytesOnDisk()<offsetInBlock) {
//finally write to the disk :
+ setBlockPosition(offsetInBlock-len);
+
out.write(pktBuf, dataOff, len);
// If this is a partial chunk, then verify that this is the only
@@ -476,6 +515,7 @@
} else {
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
+ replicaInfo.setBytesOnDisk(offsetInBlock);
datanode.myMetrics.bytesWritten.inc(len);
}
} catch (IOException iex) {
@@ -487,17 +527,11 @@
/// flush entire packet before sending ack
flush();
- // put in queue for pending acks
- if (responder != null) {
- ((PacketResponder)responder.getRunnable()).enqueue(seqno,
- lastPacketInBlock);
- }
-
if (throttler != null) { // throttle I/O
- throttler.throttle(payloadLen);
+ throttler.throttle(len);
}
- return payloadLen;
+ return len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -518,10 +552,6 @@
throttler = throttlerArg;
try {
- // write data chunk header
- if (!finalized) {
- BlockMetadataHeader.writeHeader(checksumOut, checksum);
- }
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
@@ -530,20 +560,10 @@
}
/*
- * Receive until packet length is zero.
+ * Receive until packet has zero bytes of data.
*/
while (receivePacket() > 0) {}
- // flush the mirror out
- if (mirrorOut != null) {
- try {
- mirrorOut.writeInt(0); // mark the end of the block
- mirrorOut.flush();
- } catch (IOException e) {
- handleMirrorOutError(e);
- }
- }
-
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
@@ -560,7 +580,7 @@
close();
// Finalize the block. Does this fsync()?
- block.setNumBytes(offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
}
@@ -601,21 +621,6 @@
* Sets the file pointer in the local block file to the specified value.
*/
private void setBlockPosition(long offsetInBlock) throws IOException {
- if (finalized) {
- if (!isRecovery) {
- throw new IOException("Write to offset " + offsetInBlock +
- " of block " + block +
- " that is already finalized.");
- }
- if (offsetInBlock > datanode.data.getLength(block)) {
- throw new IOException("Write to offset " + offsetInBlock +
- " of block " + block +
- " that is already finalized and is of size " +
- datanode.data.getLength(block));
- }
- return;
- }
-
if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
return; // nothing to do
}
@@ -732,12 +737,13 @@
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
+ * @param lastByteInPacket
*/
- synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+ synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
if (running) {
LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
" to ack queue.");
- ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+ ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
notifyAll();
}
}
@@ -808,26 +814,22 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (pkt.lastPacketInBlock) {
- if (!receiver.finalized) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(receiver.offsetInBlock);
- datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
+ receiver.close();
+ final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+ block.setNumBytes(replicaInfo.getNumBytes());
+ datanode.data.finalizeBlock(block);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+ if (ClientTraceLog.isInfoEnabled() &&
+ receiver.clientName.length() > 0) {
+ long offset = 0;
+ ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+ receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+ "HDFS_WRITE", receiver.clientName, offset,
+ datanode.dnRegistration.getStorageID(), block, endTime-startTime));
+ } else {
+ LOG.info("Received block " + block +
+ " of size " + block.getNumBytes() +
+ " from " + receiver.inAddr);
}
lastPacket = true;
}
@@ -835,6 +837,9 @@
replyOut.writeLong(expected);
SUCCESS.write(replyOut);
replyOut.flush();
+ if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
} catch (Exception e) {
LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
if (running) {
@@ -867,6 +872,7 @@
}
boolean lastPacketInBlock = false;
+ Packet pkt = null;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
@@ -889,7 +895,6 @@
} else {
LOG.debug("PacketResponder " + numTargets + " got seqno = " +
seqno);
- Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -947,14 +952,12 @@
// If this is the last packet in block, then close block
// file and finalize the block before responding success
- if (lastPacketInBlock && !receiver.finalized) {
+ if (lastPacketInBlock) {
receiver.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(receiver.offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
- datanode.notifyNamenodeReceivedBlock(block,
- DataNode.EMPTY_DEL_HINT);
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
@@ -978,12 +981,14 @@
" responded my status " +
" for seqno " + expected);
+ boolean success = true;
// forward responses from downstream datanodes.
for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
try {
if (op == SUCCESS) {
op = Status.read(mirrorIn);
if (op != SUCCESS) {
+ success = false;
LOG.debug("PacketResponder for block " + block +
": error code received from downstream " +
" datanode[" + i + "] " + op);
@@ -991,6 +996,7 @@
}
} catch (Throwable e) {
op = ERROR;
+ success = false;
}
op.write(replyOut);
}
@@ -998,6 +1004,10 @@
LOG.debug("PacketResponder " + block + " " + numTargets +
" responded other status " + " for seqno " + expected);
+ if (pkt != null && success &&
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
// If we were unable to read the seqno from downstream, then stop.
if (expected == -2) {
running = false;
@@ -1039,10 +1049,12 @@
static private class Packet {
long seqno;
boolean lastPacketInBlock;
+ long lastByteInBlock;
- Packet(long seqno, boolean lastPacketInBlock) {
+ Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
+ this.lastByteInBlock = lastByteInPacket;
}
}
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Sep 30 22:57:30 2009
@@ -46,13 +46,18 @@
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private Block block; // the block to read from
+
+ /** the replica to read from */
+ private final Replica replica;
+ /** The visible length of a replica. */
+ private final long replicaVisibleLength;
+
private InputStream blockIn; // data stream
private long blockInPosition = -1; // updated while using transferTo().
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
private long endOffset; // ending position
- private long blockLength;
private int bytesPerChecksum; // chunk size
private int checksumSize; // checksum size
private boolean corruptChecksumOk; // if need to verify checksum
@@ -86,10 +91,29 @@
throws IOException {
try {
this.block = block;
+ synchronized(datanode.data) {
+ this.replica = datanode.data.getReplica(block.getBlockId());
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+ this.replicaVisibleLength = replica.getVisibleLength();
+ }
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+ if (replicaVisibleLength < 0) {
+ throw new IOException("The replica is not readable, block="
+ + block + ", replica=" + replica);
+ }
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("block=" + block + ", replica=" + replica);
+ }
+
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
- this.blockLength = datanode.data.getLength(block);
this.transferToAllowed = datanode.transferToAllowed;
this.clientTraceFmt = clientTraceFmt;
@@ -119,18 +143,18 @@
* blockLength.
*/
bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+ if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
- Math.max((int)blockLength, 10*1024*1024));
+ Math.max((int)replicaVisibleLength, 10*1024*1024));
bytesPerChecksum = checksum.getBytesPerChecksum();
}
checksumSize = checksum.getChecksumSize();
if (length < 0) {
- length = blockLength;
+ length = replicaVisibleLength;
}
- endOffset = blockLength;
+ endOffset = replicaVisibleLength;
if (startOffset < 0 || startOffset > endOffset
|| (length + startOffset) > endOffset) {
String msg = " Offset " + startOffset + " and length " + length
@@ -163,6 +187,18 @@
}
seqno = 0;
+ //sleep a few times if getBytesOnDisk() < visible length
+ for(int i = 0; i < 30 && replica.getBytesOnDisk() < replicaVisibleLength; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("replica=" + replica);
+ }
+
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
} catch (IOException ioe) {
IOUtils.closeStream(this);
@@ -234,10 +270,6 @@
int len = Math.min((int) (endOffset - offset),
bytesPerChecksum*maxChunks);
- if (len == 0) {
- return 0;
- }
-
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();
@@ -246,7 +278,7 @@
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
- pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+ pkt.put((byte)((len == 0) ? 1 : 0));
//why no ByteBuf.putBoolean()?
pkt.putInt(len);
@@ -407,7 +439,8 @@
seqno++;
}
try {
- out.writeInt(0); // mark the end of block
+ // send an empty packet to mark the end of the block
+ sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
@@ -420,7 +453,7 @@
close();
}
- blockReadFully = (initialOffset == 0 && offset >= blockLength);
+ blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
return totalRead;
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Wed Sep 30 22:57:30 2009
@@ -29,11 +29,11 @@
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import java.util.regex.Matcher;
@@ -211,8 +211,8 @@
private void init() {
// get the list of blocks and arrange them in random order
- Block arr[] = dataset.getBlockReport();
- Collections.shuffle(Arrays.asList(arr));
+ List<Block> arr = dataset.getFinalizedBlocks();
+ Collections.shuffle(arr);
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Sep 30 22:57:30 2009
@@ -34,11 +34,10 @@
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,11 +54,13 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -68,6 +69,7 @@
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -75,7 +77,9 @@
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
@@ -168,8 +172,6 @@
volatile boolean shouldRun = true;
private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- /** list of blocks being recovered */
- private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
private LinkedList<String> delHints = new LinkedList<String>();
public final static String EMPTY_DEL_HINT = "";
AtomicInteger xmitsInProgress = new AtomicInteger();
@@ -912,7 +914,7 @@
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
- recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -989,13 +991,12 @@
// and can be safely GC'ed.
//
long brStartTime = now();
- Block[] bReport = data.getBlockReport();
+ BlockListAsLongs bReport = data.getBlockReport();
- cmd = namenode.blockReport(dnRegistration,
- BlockListAsLongs.convertToArrayLongs(bReport));
+ cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
long brTime = now() - brStartTime;
myMetrics.blockReports.inc(brTime);
- LOG.info("BlockReport of " + bReport.length +
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
" blocks got processed in " + brTime + " msecs");
//
// If we have sent the first block report, then wait a random
@@ -1250,7 +1251,8 @@
EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
}
DataTransferProtocol.Sender.opWriteBlock(out,
- b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+ b.getBlockId(), b.getGenerationStamp(), 0,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
srcNode, targets, accessToken);
// send data & checksum
@@ -1273,6 +1275,20 @@
}
}
}
+
+ /**
+ * After a block becomes finalized, a datanode increases metric counter,
+ * notifies namenode, and adds it to the block scanner
+ * @param block
+ * @param delHint
+ */
+ void closeBlock(Block block, String delHint) {
+ myMetrics.blocksWritten.inc();
+ notifyNamenodeReceivedBlock(block, delHint);
+ if (blockScanner != null) {
+ blockScanner.addBlock(block);
+ }
+ }
/**
* No matter what kind of exception we get, keep retrying to offerService().
@@ -1514,16 +1530,16 @@
return info;
}
- public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+ public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
- for(int i = 0; i < blocks.length; i++) {
+ for(RecoveringBlock b : blocks) {
try {
- logRecoverBlock("NameNode", blocks[i], targets[i]);
- recoverBlock(blocks[i], false, targets[i], true);
+ logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+ recoverBlock(b);
} catch (IOException e) {
- LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
+ LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
@@ -1548,6 +1564,38 @@
}
}
+ @Override // InterDatanodeProtocol
+ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException {
+ return data.initReplicaRecovery(rBlock);
+ }
+
+ /**
+ * Convenience method, which unwraps RemoteException.
+ * @throws IOException not a RemoteException.
+ */
+ private static ReplicaRecoveryInfo callInitReplicaRecovery(
+ InterDatanodeProtocol datanode,
+ RecoveringBlock rBlock) throws IOException {
+ try {
+ return datanode.initReplicaRecovery(rBlock);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException();
+ }
+ }
+
+ /**
+ * Update replica with the new generation stamp and length.
+ */
+ @Override // InterDatanodeProtocol
+ public Block updateReplicaUnderRecovery(Block oldBlock,
+ long recoveryId,
+ long newLength) throws IOException {
+ ReplicaInfo r =
+ data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength);
+ return new Block(r);
+ }
+
/** {@inheritDoc} */
public long getProtocolVersion(String protocol, long clientVersion
) throws IOException {
@@ -1560,164 +1608,171 @@
+ ": " + protocol);
}
- /** A convenient class used in lease recovery */
+ /** A convenient class used in block recovery */
private static class BlockRecord {
final DatanodeID id;
final InterDatanodeProtocol datanode;
- final Block block;
+ final ReplicaRecoveryInfo rInfo;
- BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+ BlockRecord(DatanodeID id,
+ InterDatanodeProtocol datanode,
+ ReplicaRecoveryInfo rInfo) {
this.id = id;
this.datanode = datanode;
- this.block = block;
+ this.rInfo = rInfo;
}
/** {@inheritDoc} */
public String toString() {
- return "block:" + block + " node:" + id;
+ return "block:" + rInfo + " node:" + id;
}
}
/** Recover a block */
- private LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeInfo[] targets, boolean closeFile) throws IOException {
-
+ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
+ Block block = rBlock.getBlock();
+ DatanodeInfo[] targets = rBlock.getLocations();
DatanodeID[] datanodeids = (DatanodeID[])targets;
- // If the block is already being recovered, then skip recovering it.
- // This can happen if the namenode and client start recovering the same
- // file at the same time.
- synchronized (ongoingRecovery) {
- Block tmp = new Block();
- tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
- if (ongoingRecovery.get(tmp) != null) {
- String msg = "Block " + block + " is already being recovered, " +
- " ignoring this request to recover it.";
- LOG.info(msg);
- throw new IOException(msg);
- }
- ongoingRecovery.put(block, block);
- }
- try {
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
- long minlength = Long.MAX_VALUE;
- int errorCount = 0;
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
+ int errorCount = 0;
- //check generation stamps
- for(DatanodeID id : datanodeids) {
- try {
- InterDatanodeProtocol datanode = dnRegistration.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
- if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- if (keepLength) {
- if (info.getNumBytes() == block.getNumBytes()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- }
- }
- else {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
- }
- }
- }
- } catch (IOException e) {
- ++errorCount;
- InterDatanodeProtocol.LOG.warn(
- "Failed to getBlockMetaDataInfo for block (=" + block
- + ") from datanode (=" + id + ")", e);
+ //check generation stamps
+ for(DatanodeID id : datanodeids) {
+ try {
+ InterDatanodeProtocol datanode = dnRegistration.equals(id)?
+ this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+ ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
+ if (info != null &&
+ info.getGenerationStamp() >= block.getGenerationStamp() &&
+ info.getNumBytes() > 0) {
+ syncList.add(new BlockRecord(id, datanode, info));
}
+ } catch (RecoveryInProgressException ripE) {
+ InterDatanodeProtocol.LOG.warn(
+ "Recovery for replica " + block + " on data-node " + id
+ + " is already in progress. Recovery id = "
+ + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+ return;
+ } catch (IOException e) {
+ ++errorCount;
+ InterDatanodeProtocol.LOG.warn(
+ "Failed to obtain replica info for block (=" + block
+ + ") from datanode (=" + id + ")", e);
}
+ }
- if (syncList.isEmpty() && errorCount > 0) {
- throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
- }
- if (!keepLength) {
- block.setNumBytes(minlength);
- }
- return syncBlock(block, syncList, targets, closeFile);
- } finally {
- synchronized (ongoingRecovery) {
- ongoingRecovery.remove(block);
- }
+ if (errorCount == datanodeids.length) {
+ throw new IOException("All datanodes failed: block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
}
+
+ syncBlock(rBlock, syncList);
}
/** Block synchronization */
- private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
- DatanodeInfo[] targets, boolean closeFile) throws IOException {
+ private void syncBlock(RecoveringBlock rBlock,
+ List<BlockRecord> syncList) throws IOException {
+ Block block = rBlock.getBlock();
+ long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
- + "), syncList=" + syncList + ", closeFile=" + closeFile);
+ + "), syncList=" + syncList);
}
- //syncList.isEmpty() that all datanodes do not have the block
- //so the block can be deleted.
+ // syncList.isEmpty() means that all data-nodes do not have the block
+ // or their replicas have 0 length.
+ // The block can be deleted.
if (syncList.isEmpty()) {
- namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
- DatanodeID.EMPTY_ARRAY);
- //always return a new access token even if everything else stays the same
- LocatedBlock b = new LocatedBlock(block, targets);
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
- }
- return b;
+ namenode.commitBlockSynchronization(block, recoveryId, 0,
+ true, true, DatanodeID.EMPTY_ARRAY);
+ return;
}
- List<DatanodeID> successList = new ArrayList<DatanodeID>();
+ // Calculate the best available replica state.
+ ReplicaState bestState = ReplicaState.RWR;
+ long finalizedLength = -1;
+ for(BlockRecord r : syncList) {
+ assert r.rInfo.getNumBytes() > 0 : "zero length replica";
+ ReplicaState rState = r.rInfo.getOriginalReplicaState();
+ if(rState.getValue() < bestState.getValue())
+ bestState = rState;
+ if(rState == ReplicaState.FINALIZED) {
+ if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
+ throw new IOException("Inconsistent size of finalized replicas. " +
+ "Replica " + r.rInfo + " expected size: " + finalizedLength);
+ finalizedLength = r.rInfo.getNumBytes();
+ }
+ }
- long generationstamp = namenode.nextGenerationStamp(block);
- Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
+ // Calculate list of nodes that will participate in the recovery
+ // and the new block size
+ List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
+ Block newBlock = new Block(block.getBlockId(), -1, recoveryId);
+ switch(bestState) {
+ case FINALIZED:
+ assert finalizedLength > 0 : "finalizedLength is not positive";
+ for(BlockRecord r : syncList) {
+ ReplicaState rState = r.rInfo.getOriginalReplicaState();
+ if(rState == ReplicaState.FINALIZED ||
+ rState == ReplicaState.RBW &&
+ r.rInfo.getNumBytes() == finalizedLength)
+ participatingList.add(r);
+ }
+ newBlock.setNumBytes(finalizedLength);
+ break;
+ case RBW:
+ case RWR:
+ long minLength = Long.MAX_VALUE;
+ for(BlockRecord r : syncList) {
+ ReplicaState rState = r.rInfo.getOriginalReplicaState();
+ if(rState == bestState) {
+ minLength = Math.min(minLength, r.rInfo.getNumBytes());
+ participatingList.add(r);
+ }
+ }
+ newBlock.setNumBytes(minLength);
+ break;
+ case RUR:
+ case TEMPORARY:
+ assert false : "bad replica state: " + bestState;
+ }
- for(BlockRecord r : syncList) {
+ List<DatanodeID> failedList = new ArrayList<DatanodeID>();
+ List<DatanodeID> successList = new ArrayList<DatanodeID>();
+ for(BlockRecord r : participatingList) {
try {
- r.datanode.updateBlock(r.block, newblock, closeFile);
+ Block reply = r.datanode.updateReplicaUnderRecovery(
+ r.rInfo, recoveryId, newBlock.getNumBytes());
+ assert reply.equals(newBlock) &&
+ reply.getNumBytes() == newBlock.getNumBytes() :
+ "Updated replica must be the same as the new block.";
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
- + newblock + ", datanode=" + r.id + ")", e);
+ + newBlock + ", datanode=" + r.id + ")", e);
+ failedList.add(r.id);
}
}
- if (!successList.isEmpty()) {
- DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-
- namenode.commitBlockSynchronization(block,
- newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
- nlist);
- DatanodeInfo[] info = new DatanodeInfo[nlist.length];
- for (int i = 0; i < nlist.length; i++) {
- info[i] = new DatanodeInfo(nlist[i]);
- }
- LocatedBlock b = new LocatedBlock(newblock, info); // success
- // should have used client ID to generate access token, but since
- // owner ID is not checked, we simply pass null for now.
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ // If any of the data-nodes failed, the recovery fails, because
+ // we never know the actual state of the replica on failed data-nodes.
+ // The recovery should be started over.
+ if(!failedList.isEmpty()) {
+ StringBuilder b = new StringBuilder();
+ for(DatanodeID id : failedList) {
+ b.append("\n " + id);
}
- return b;
+ throw new IOException("Cannot recover " + block + ", the following "
+ + failedList.size() + " data-nodes failed {" + b + "\n}");
}
- //failed
- StringBuilder b = new StringBuilder();
- for(BlockRecord r : syncList) {
- b.append("\n " + r.id);
- }
- throw new IOException("Cannot recover " + block + ", none of these "
- + syncList.size() + " datanodes success {" + b + "\n}");
+ // Notify the name-node about successfully recovered replicas.
+ DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+ namenode.commitBlockSynchronization(block,
+ newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
+ nlist);
}
- // ClientDataNodeProtocol implementation
- /** {@inheritDoc} */
- public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
- ) throws IOException {
- logRecoverBlock("Client", block, targets);
- return recoverBlock(block, keepLength, targets, false);
- }
-
private static void logRecoverBlock(String who,
Block block, DatanodeID[] targets) {
StringBuilder msg = new StringBuilder(targets[0].getName());
@@ -1727,4 +1782,11 @@
LOG.info(who + " calls recoverBlock(block=" + block
+ ", targets=[" + msg + "])");
}
+
+ // ClientDataNodeProtocol implementation
+ /** {@inheritDoc} */
+ @Override // ClientDataNodeProtocol
+ public long getReplicaVisibleLength(final Block block) throws IOException {
+ return data.getReplicaVisibleLength(block);
+ }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Sep 30 22:57:30 2009
@@ -53,6 +53,9 @@
final static String BLOCK_SUBDIR_PREFIX = "subdir";
final static String BLOCK_FILE_PREFIX = "blk_";
final static String COPY_FILE_PREFIX = "dncp_";
+ final static String STORAGE_DIR_RBW = "rbw";
+ final static String STORAGE_DIR_FINALIZED = "finalized";
+ final static String STORAGE_DIR_DETACHED = "detach";
private String storageID;
@@ -270,6 +273,8 @@
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
assert curDir.exists() : "Current directory must exist.";
+ // Cleanup directory "detach"
+ cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
// delete previous dir before upgrading
if (prevDir.exists())
deleteDir(prevDir);
@@ -277,8 +282,11 @@
assert !tmpDir.exists() : "previous.tmp directory must not exist.";
// rename current to tmp
rename(curDir, tmpDir);
- // hardlink blocks
- linkBlocks(tmpDir, curDir, this.getLayoutVersion());
+ // hard link finalized & rbw blocks
+ linkAllBlocks(tmpDir, curDir);
+ // create current directory if not exists
+ if (!curDir.exists() && !curDir.mkdirs())
+ throw new IOException("Cannot create directory " + curDir);
// write version file
this.layoutVersion = FSConstants.LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID() :
@@ -290,6 +298,30 @@
LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
}
+ /**
+ * Cleanup the detachDir.
+ *
+ * If the directory is not empty report an error;
+ * Otherwise remove the directory.
+ *
+ * @param detachDir detach directory
+ * @throws IOException if the directory is not empty or it can not be removed
+ */
+ private void cleanupDetachDir(File detachDir) throws IOException {
+ if (layoutVersion >= PRE_RBW_LAYOUT_VERSION &&
+ detachDir.exists() && detachDir.isDirectory() ) {
+
+ if (detachDir.list().length != 0 ) {
+ throw new IOException("Detached directory " + detachDir +
+ " is not empty. Please manually move each file under this " +
+ "directory to the finalized directory if the finalized " +
+ "directory tree does not have the file.");
+ } else if (!detachDir.delete()) {
+ throw new IOException("Cannot remove directory " + detachDir);
+ }
+ }
+ }
+
void doRollback( StorageDirectory sd,
NamespaceInfo nsInfo
) throws IOException {
@@ -359,8 +391,34 @@
doFinalize(it.next());
}
}
+
+ /**
+ * Hardlink all finalized and RBW blocks in fromDir to toDir
+ * @param fromDir directory where the snapshot is stored
+ * @param toDir the current data directory
+ * @throws IOException if error occurs during hardlink
+ */
+ private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+ // do the link
+ int diskLayoutVersion = this.getLayoutVersion();
+ if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
+ // hardlink finalized blocks in tmpDir/finalized
+ linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
+ new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+ // hardlink rbw blocks in tmpDir/finalized
+ linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
+ new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion);
+ } else { // pre-RBW version
+ // hardlink finalized blocks in tmpDir
+ linkBlocks(fromDir,
+ new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+ }
+ }
static void linkBlocks(File from, File to, int oldLV) throws IOException {
+ if (!from.exists()) {
+ return;
+ }
if (!from.isDirectory()) {
if (from.getName().startsWith(COPY_FILE_PREFIX)) {
FileInputStream in = new FileInputStream(from);
@@ -387,7 +445,7 @@
return;
}
// from is a directory
- if (!to.mkdir())
+ if (!to.mkdirs())
throw new IOException("Cannot create directory " + to);
String[] blockNames = from.list(new java.io.FilenameFilter() {
public boolean accept(File dir, String name) {
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Sep 30 22:57:30 2009
@@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
import org.apache.hadoop.io.IOUtils;
@@ -208,7 +209,8 @@
*/
@Override
protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
- int pipelineSize, boolean isRecovery,
+ int pipelineSize, BlockConstructionStage stage,
+ long newGs, long minBytesRcvd, long maxBytesRcvd,
String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
AccessToken accessToken) throws IOException {
@@ -250,11 +252,17 @@
String firstBadLink = ""; // first datanode that failed in connection setup
DataTransferProtocol.Status mirrorInStatus = SUCCESS;
try {
- // open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(block, in,
- s.getRemoteSocketAddress().toString(),
- s.getLocalSocketAddress().toString(),
- isRecovery, client, srcDataNode, datanode);
+ if (client.length() == 0 ||
+ stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ // open a block receiver
+ blockReceiver = new BlockReceiver(block, in,
+ s.getRemoteSocketAddress().toString(),
+ s.getLocalSocketAddress().toString(),
+ stage, newGs, minBytesRcvd, maxBytesRcvd,
+ client, srcDataNode, datanode);
+ } else {
+ datanode.data.recoverClose(block, newGs, minBytesRcvd);
+ }
//
// Open network conn to backup machine, if
@@ -282,10 +290,13 @@
// Write header: Copied from DFSClient.java!
DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
- block.getBlockId(), block.getGenerationStamp(), pipelineSize,
- isRecovery, client, srcDataNode, targets, accessToken);
+ blockId, blockGs,
+ pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client,
+ srcDataNode, targets, accessToken);
- blockReceiver.writeChecksumHeader(mirrorOut);
+ if (blockReceiver != null) { // send checksum header
+ blockReceiver.writeChecksumHeader(mirrorOut);
+ }
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
@@ -336,24 +347,31 @@
}
// receive the block and mirror to the next target
- String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
- blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
+ if (blockReceiver != null) {
+ String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+ blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+ mirrorAddr, null, targets.length);
+ }
- // if this write is for a replication request (and not
- // from a client), then confirm block. For client-writes,
+ // update its generation stamp
+ if (client.length() != 0 &&
+ stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ block.setGenerationStamp(newGs);
+ block.setNumBytes(minBytesRcvd);
+ }
+
+ // if this write is for a replication request or recovering
+ // a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
- if (client.length() == 0) {
- datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+ if (client.length() == 0 ||
+ stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
" of size " + block.getNumBytes());
}
- if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(block);
- }
} catch (IOException ioe) {
LOG.info("writeBlock " + block + " received exception " + ioe);
@@ -569,7 +587,7 @@
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- false, "", null, datanode);
+ null, 0, 0, 0, "", null, datanode);
// receive a block
blockReceiver.receiveBlock(null, null, null, null,