You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/02 06:16:51 UTC
svn commit: r810353 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Author: hairong
Date: Wed Sep 2 04:16:51 2009
New Revision: 810353
URL: http://svn.apache.org/viewvc?rev=810353&view=rev
Log:
HDFS-537. DataNode exposes a replica's meta info to BlockReived. Contributed by Hairong Kuang.
Added:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Wed Sep 2 04:16:51 2009
@@ -16,6 +16,10 @@
HDFS-565. Introduce block committing logic during new block allocation
and file close. (shv)
+ HDFS-537. DataNode exposes a replica's meta info to BlockReceiver for the
+ support of dfs writes/hflush. It also updates a replica's bytes received,
+ bytes on disk, and bytes acked after receiving a packet. (hairong)
+
IMPROVEMENTS
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Sep 2 04:16:51 2009
@@ -65,7 +65,6 @@
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf
private int maxPacketReadLen;
- protected long offsetInBlock;
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
@@ -78,6 +77,7 @@
DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
+ final private ReplicaInPipelineInterface replicaInfo;
BlockReceiver(Block block, DataInputStream in, String inAddr,
String myAddr, boolean isRecovery, String clientName,
@@ -89,7 +89,6 @@
this.myAddr = myAddr;
this.isRecovery = isRecovery;
this.clientName = clientName;
- this.offsetInBlock = 0;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
this.checksum = DataChecksum.newDataChecksum(in);
@@ -100,13 +99,14 @@
// Open local disk out
//
if (clientName.length() == 0) { //replication or move
- streams = datanode.data.writeToTemporary(block);
+ replicaInfo = datanode.data.writeToTemporary(block);
} else if (finalized && isRecovery) { // client append
- streams = datanode.data.append(block);
+ replicaInfo = datanode.data.append(block);
this.finalized = false;
} else { // client write
- streams = datanode.data.writeToRbw(block, isRecovery);
+ replicaInfo = datanode.data.writeToRbw(block, isRecovery);
}
+ streams = replicaInfo.createStreams();
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
@@ -397,10 +397,22 @@
buf.mark();
//read the header
buf.getInt(); // packet length
- offsetInBlock = buf.getLong(); // get offset of packet in block
+ long offsetInBlock = buf.getLong(); // get offset of packet in block
+
+ if (offsetInBlock > replicaInfo.getNumBytes()) {
+ throw new IOException("Received an out-of-sequence packet for " + block +
+ "from " + inAddr + " at offset " + offsetInBlock +
+ ". Expecting packet starting at " + replicaInfo.getNumBytes());
+ }
long seqno = buf.getLong(); // get seqno
boolean lastPacketInBlock = (buf.get() != 0);
+ int len = buf.getInt();
+ if (len < 0) {
+ throw new IOException("Got wrong length during writeBlock(" + block +
+ ") from " + inAddr + " at offset " +
+ offsetInBlock + ": " + len);
+ }
int endOfHeader = buf.position();
buf.reset();
@@ -412,8 +424,12 @@
" lastPacketInBlock " + lastPacketInBlock);
}
- setBlockPosition(offsetInBlock);
-
+ // update received bytes
+ offsetInBlock += len;
+ if (replicaInfo.getNumBytes() < offsetInBlock) {
+ replicaInfo.setNumBytes(offsetInBlock);
+ }
+
//First write the packet to the mirror:
if (mirrorOut != null) {
try {
@@ -425,19 +441,10 @@
}
buf.position(endOfHeader);
- int len = buf.getInt();
- if (len < 0) {
- throw new IOException("Got wrong length during writeBlock(" + block +
- ") from " + inAddr + " at offset " +
- offsetInBlock + ": " + len);
- }
-
if (len == 0) {
LOG.debug("Receiving empty packet for block " + block);
} else {
- offsetInBlock += len;
-
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -463,8 +470,10 @@
}
try {
- if (!finalized) {
+ if (!finalized && replicaInfo.getBytesOnDisk()<offsetInBlock) {
//finally write to the disk :
+ setBlockPosition(offsetInBlock-len);
+
out.write(pktBuf, dataOff, len);
// If this is a partial chunk, then verify that this is the only
@@ -485,6 +494,7 @@
} else {
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
+ replicaInfo.setBytesOnDisk(offsetInBlock);
datanode.myMetrics.bytesWritten.inc(len);
}
} catch (IOException iex) {
@@ -499,7 +509,7 @@
// put in queue for pending acks
if (responder != null) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
- lastPacketInBlock);
+ lastPacketInBlock, offsetInBlock);
}
if (throttler != null) { // throttle I/O
@@ -569,7 +579,7 @@
close();
// Finalize the block. Does this fsync()?
- block.setNumBytes(offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
}
@@ -741,12 +751,13 @@
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
+ * @param lastByteInPacket
*/
- synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+ synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
if (running) {
LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
" to ack queue.");
- ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+ ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
notifyAll();
}
}
@@ -820,7 +831,7 @@
if (!receiver.finalized) {
receiver.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(receiver.offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
datanode.notifyNamenodeReceivedBlock(block,
@@ -910,6 +921,9 @@
}
pkt = ackQueue.removeFirst();
expected = pkt.seqno;
+ if (pkt.lastByteInBlock > replicaInfo.getBytesAcked()) {
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+ }
notifyAll();
LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
if (seqno != expected) {
@@ -953,7 +967,7 @@
if (lastPacketInBlock && !receiver.finalized) {
receiver.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(receiver.offsetInBlock);
+ block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
datanode.notifyNamenodeReceivedBlock(block,
@@ -1042,10 +1056,12 @@
static private class Packet {
long seqno;
boolean lastPacketInBlock;
+ long lastByteInBlock;
- Packet(long seqno, boolean lastPacketInBlock) {
+ Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
+ this.lastByteInBlock = lastByteInPacket;
}
}
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Sep 2 04:16:51 2009
@@ -954,12 +954,6 @@
new FileInputStream(metaInFile.getFD()));
}
- private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
- return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
- new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
-
- }
-
/**
* Make a copy of the block if this block is linked to an existing
* snapshot. This ensures that modifying this block does not modify
@@ -1100,7 +1094,7 @@
}
@Override // FSDatasetInterface
- public BlockWriteStreams append(Block b)
+ public ReplicaInPipelineInterface append(Block b)
throws IOException {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
@@ -1129,9 +1123,9 @@
FSVolume v = volumes.getNextVolume(b.getNumBytes());
File newBlkFile = v.createRbwFile(b);
File oldmeta = replicaInfo.getMetaFile();
- replicaInfo = new ReplicaBeingWritten(replicaInfo,
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(replicaInfo,
v, newBlkFile.getParentFile(), Thread.currentThread());
- File newmeta = replicaInfo.getMetaFile();
+ File newmeta = newReplicaInfo.getMetaFile();
// rename meta file to rbw directory
if (DataNode.LOG.isDebugEnabled()) {
@@ -1158,31 +1152,23 @@
}
// Replace finalized replica by a RBW replica in replicas map
- volumeMap.add(replicaInfo);
+ volumeMap.add(newReplicaInfo);
- File metafile = getMetaFile(newBlkFile, b);
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("append blockfile is " + newBlkFile
- + " of size " + newBlkFile.length());
- DataNode.LOG.debug("append metafile is " + metafile
- + " of size " + metafile.length());
- }
- // return the write stream
- return createBlockWriteStreams(newBlkFile , metafile);
+ return newReplicaInfo;
}
@Override
- public BlockWriteStreams writeToRbw(Block b, boolean isRecovery)
+ public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b);
- File f = null;
+ ReplicaBeingWritten newReplicaInfo;
if (replicaInfo == null) { // create a new block
FSVolume v = volumes.getNextVolume(b.getNumBytes());
// create a rbw file to hold block in the designated volume
- f = v.createRbwFile(b);
- replicaInfo = new ReplicaBeingWritten(b.getBlockId(),
+ File f = v.createRbwFile(b);
+ newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
- volumeMap.add(replicaInfo);
+ volumeMap.add(newReplicaInfo);
} else {
if (!isRecovery) {
throw new BlockAlreadyExistsException("Block " + b +
@@ -1193,35 +1179,21 @@
throw new BlockNotFoundException(
BlockNotFoundException.NON_RBW_REPLICA + b);
}
- ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline)replicaInfo;
+ newReplicaInfo = (ReplicaBeingWritten)replicaInfo;
synchronized (this) {
//
// Is it already in the write process?
//
- replicaInPipeline.stopWriter();
- replicaInPipeline.setWriter(Thread.currentThread());
+ newReplicaInfo.stopWriter();
+ newReplicaInfo.setWriter(Thread.currentThread());
}
- f = replicaInfo.getBlockFile();
}
- //
- // Finally, allow a writer to the block file
- // REMIND - mjc - make this a filter stream that enforces a max
- // block size, so clients can't go crazy
- //
- File metafile = getMetaFile(f, b);
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("writeToRbw blockfile is " + f +
- " of size " + f.length());
- DataNode.LOG.debug("writeToRbw metafile is " + metafile +
- " of size " + metafile.length());
- }
- return createBlockWriteStreams( f , metafile);
-
+ return newReplicaInfo;
}
@Override
- public BlockWriteStreams writeToTemporary(Block b)
+ public ReplicaInPipelineInterface writeToTemporary(Block b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b);
if (replicaInfo != null) {
@@ -1230,23 +1202,14 @@
" and thus cannot be created.");
}
- File f = null;
FSVolume v = volumes.getNextVolume(b.getNumBytes());
// create a temporary file to hold block in the designated volume
- f = v.createTmpFile(b);
- replicaInfo = new ReplicaInPipeline(b.getBlockId(),
+ File f = v.createTmpFile(b);
+ ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
- volumeMap.add(replicaInfo);
+ volumeMap.add(newReplicaInfo);
- // return the output streams
- File metafile = getMetaFile(f, b);
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("writeToTemp blockfile is " + f +
- " of size " + f.length());
- DataNode.LOG.debug("writeToTemp metafile is " + metafile +
- " of size " + metafile.length());
- }
- return createBlockWriteStreams( f , metafile);
+ return newReplicaInfo;
}
/**
@@ -1293,16 +1256,6 @@
return vol.createTmpFile(blk);
}
- synchronized File createRbwFile( FSVolume vol, Block blk ) throws IOException {
- if ( vol == null ) {
- vol = getReplicaInfo( blk ).getVolume();
- if ( vol == null ) {
- throw new IOException("Could not find volume for block " + blk);
- }
- }
- return vol.createTmpFile(blk);
- }
-
//
// REMIND - mjc - eventually we should have a timeout system
// in place to clean up block files left by abandoned clients.
@@ -1329,11 +1282,11 @@
File f = replicaInfo.getBlockFile();
if (v == null) {
throw new IOException("No volume for temporary file " + f +
- " for block " + b);
+ " for block " + replicaInfo);
}
- File dest = v.addBlock(b, f);
- newReplicaInfo = new FinalizedReplica(b, v, dest.getParentFile());
+ File dest = v.addBlock(replicaInfo, f);
+ newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
}
volumeMap.add(newReplicaInfo);
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Sep 2 04:16:51 2009
@@ -174,11 +174,10 @@
* Creates a temporary replica and returns output streams to write data and CRC
*
* @param b block
- * @return a BlockWriteStreams object to allow writing the block data
- * and CRC
+ * @return the meata info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public BlockWriteStreams writeToTemporary(Block b) throws IOException;
+ public ReplicaInPipelineInterface writeToTemporary(Block b) throws IOException;
/**
* Creates/recovers a RBW replica and returns output streams to
@@ -186,20 +185,19 @@
*
* @param b block
* @param isRecovery True if this is part of error recovery, otherwise false
- * @return a BlockWriteStreams object to allow writing the block data
- * and CRC
+ * @return the meata info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public BlockWriteStreams writeToRbw(Block b, boolean isRecovery) throws IOException;
+ public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
+ throws IOException;
/**
* Append to a finalized replica and returns output streams to write data and CRC
* @param b block
- * @return a BlockWriteStreams object to allow writing the block data
- * and CRC
+ * @return the meata info of the replica which is being written to
* @throws IOException
*/
- public BlockWriteStreams append(Block b) throws IOException;
+ public ReplicaInPipelineInterface append(Block b) throws IOException;
/**
* Update the block to the new generation stamp and length.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Wed Sep 2 04:16:51 2009
@@ -18,11 +18,15 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+import org.apache.hadoop.io.IOUtils;
/**
* This class defines a replica in a pipeline, which
@@ -32,7 +36,8 @@
*
* The base class implements a temporary replica
*/
-class ReplicaInPipeline extends ReplicaInfo {
+class ReplicaInPipeline extends ReplicaInfo
+ implements ReplicaInPipelineInterface {
private long bytesAcked;
private long bytesOnDisk;
private Thread writer;
@@ -91,35 +96,23 @@
return ReplicaState.TEMPORARY;
}
- /**
- * Get the number of bytes acked
- * @return the number of bytes acked
- */
- long getBytesAcked() {
+ @Override // ReplicaInPipelineInterface
+ public long getBytesAcked() {
return bytesAcked;
}
- /**
- * Set the number bytes that have acked
- * @param bytesAcked
- */
- void setBytesAcked(long bytesAcked) {
+ @Override // ReplicaInPipelineInterface
+ public void setBytesAcked(long bytesAcked) {
this.bytesAcked = bytesAcked;
}
- /**
- * Get the number of bytes that have written to disk
- * @return the number of bytes that have written to disk
- */
- long getBytesOnDisk() {
+ @Override // ReplicaInPipelineInterface
+ public long getBytesOnDisk() {
return bytesOnDisk;
}
- /**
- * Set the number of bytes on disk
- * @param bytesOnDisk number of bytes on disk
- */
- void setBytesOnDisk(long bytesOnDisk) {
+ @Override //ReplicaInPipelineInterface
+ public void setBytesOnDisk(long bytesOnDisk) {
this.bytesOnDisk = bytesOnDisk;
}
@@ -155,4 +148,29 @@
public int hashCode() {
return super.hashCode();
}
+
+ @Override // ReplicaInPipelineInterface
+ public BlockWriteStreams createStreams() throws IOException {
+ File blockFile = getBlockFile();
+ File metaFile = getMetaFile();
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("writeTo blockfile is " + blockFile +
+ " of size " + blockFile.length());
+ DataNode.LOG.debug("writeTo metafile is " + metaFile +
+ " of size " + metaFile.length());
+ }
+ FileOutputStream blockOut = null;
+ FileOutputStream crcOut = null;
+ try {
+ blockOut = new FileOutputStream(
+ new RandomAccessFile( blockFile, "rw" ).getFD() );
+ crcOut = new FileOutputStream(
+ new RandomAccessFile( metaFile, "rw" ).getFD() );
+ return new BlockWriteStreams(blockOut, crcOut);
+ } catch (IOException e) {
+ IOUtils.closeStream(blockOut);
+ IOUtils.closeStream(crcOut);
+ throw e;
+ }
+ }
}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=810353&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Wed Sep 2 04:16:51 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+
+/**
+ * This defines the interface of a replica in Pipeline that's being written to
+ */
+interface ReplicaInPipelineInterface {
+ /**
+ * Get the number of bytes received
+ * @return the number of bytes that have been received
+ */
+ long getNumBytes();
+
+ /**
+ * Set the number of bytes received
+ * @param bytesReceived number of bytes received
+ */
+ void setNumBytes(long bytesReceived);
+
+ /**
+ * Get the number of bytes acked
+ * @return the number of bytes acked
+ */
+ long getBytesAcked();
+
+ /**
+ * Set the number bytes that have acked
+ * @param bytesAcked
+ */
+ void setBytesAcked(long bytesAcked);
+
+ /**
+ * Get the number of bytes that have written to disk
+ * @return the number of bytes that have written to disk
+ */
+ long getBytesOnDisk();
+
+ /**
+ * Set the number of bytes on disk
+ * @param bytesOnDisk number of bytes on disk
+ */
+ void setBytesOnDisk(long bytesOnDisk);
+
+ /**
+ * Create output streams for writing to this replica,
+ * one for block file and one for CRC file
+ *
+ * @return output streams for writing
+ * @throws IOException if any error occurs
+ */
+ public BlockWriteStreams createStreams() throws IOException;
+}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml Wed Sep 2 04:16:51 2009
@@ -214,10 +214,7 @@
-->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.FSDataset" />
- <Or>
- <Method name="createBlockWriteStreams" />
- <Method name="getTmpInputStreams" />
- </Or>
+ <Method name="getTmpInputStreams" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Sep 2 04:16:51 2009
@@ -77,11 +77,14 @@
nullCrcFileData[i+2] = nullCrcHeader[i];
}
}
-
- private class BInfo { // information about a single block
+
+ // information about a single block
+ private class BInfo implements ReplicaInPipelineInterface {
Block theBlock;
private boolean finalized = false; // if not finalized => ongoing creation
SimulatedOutputStream oStream = null;
+ private long bytesAcked;
+ private long bytesRcvd;
BInfo(Block b, boolean forWriting) throws IOException {
theBlock = new Block(b);
if (theBlock.getNumBytes() < 0) {
@@ -108,20 +111,21 @@
synchronized void updateBlock(Block b) {
theBlock.setGenerationStamp(b.getGenerationStamp());
- setlength(b.getNumBytes());
+ setNumBytes(b.getNumBytes());
+ setBytesOnDisk(b.getNumBytes());
}
- synchronized long getlength() {
+ synchronized public long getNumBytes() {
if (!finalized) {
- return oStream.getLength();
+ return bytesRcvd;
} else {
return theBlock.getNumBytes();
}
}
- synchronized void setlength(long length) {
+ synchronized public void setNumBytes(long length) {
if (!finalized) {
- oStream.setLength(length);
+ bytesRcvd = length;
} else {
theBlock.setNumBytes(length);
}
@@ -170,7 +174,20 @@
oStream = null;
return;
}
-
+
+ synchronized void unfinalizeBlock() throws IOException {
+ if (!finalized) {
+ throw new IOException("Unfinalized a block that's not finalized "
+ + theBlock);
+ }
+ finalized = false;
+ oStream = new SimulatedOutputStream();
+ long blockLen = theBlock.getNumBytes();
+ oStream.setLength(blockLen);
+ bytesRcvd = blockLen;
+ bytesAcked = blockLen;
+ }
+
SimulatedInputStream getMetaIStream() {
return new SimulatedInputStream(nullCrcFileData);
}
@@ -178,6 +195,49 @@
synchronized boolean isFinalized() {
return finalized;
}
+
+ @Override
+ synchronized public BlockWriteStreams createStreams() throws IOException {
+ if (finalized) {
+ throw new IOException("Trying to write to a finalized replica "
+ + theBlock);
+ } else {
+ SimulatedOutputStream crcStream = new SimulatedOutputStream();
+ return new BlockWriteStreams(oStream, crcStream);
+ }
+ }
+
+ @Override
+ synchronized public long getBytesAcked() {
+ if (finalized) {
+ return theBlock.getNumBytes();
+ } else {
+ return bytesAcked;
+ }
+ }
+
+ @Override
+ synchronized public void setBytesAcked(long bytesAcked) {
+ if (!finalized) {
+ this.bytesAcked = bytesAcked;
+ }
+ }
+
+ @Override
+ synchronized public long getBytesOnDisk() {
+ if (finalized) {
+ return theBlock.getNumBytes();
+ } else {
+ return oStream.getLength();
+ }
+ }
+
+ @Override
+ synchronized public void setBytesOnDisk(long bytesOnDisk) {
+ if (!finalized) {
+ oStream.setLength(bytesOnDisk);
+ }
+ }
}
static private class SimulatedStorage {
@@ -311,7 +371,7 @@
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
- return binfo.getlength();
+ return binfo.getNumBytes();
}
/** {@inheritDoc} */
@@ -322,7 +382,7 @@
return null;
}
b.setGenerationStamp(binfo.getGenerationStamp());
- b.setNumBytes(binfo.getlength());
+ b.setNumBytes(binfo.getNumBytes());
return b;
}
@@ -350,7 +410,7 @@
DataNode.LOG.warn("Invalidate: Missing block");
continue;
}
- storage.free(binfo.getlength());
+ storage.free(binfo.getNumBytes());
blockMap.remove(b);
}
if (error) {
@@ -381,25 +441,35 @@
}
@Override
- public BlockWriteStreams append(Block b) throws IOException {
- return writeToBlock(b, true);
+ public ReplicaInPipelineInterface append(Block b) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null || !binfo.isFinalized()) {
+ throw new BlockNotFoundException("Block " + b
+ + " is not valid, and cannot be appended to.");
+ }
+ binfo.unfinalizeBlock();
+ return binfo;
}
@Override
- public synchronized BlockWriteStreams writeToRbw(Block b, boolean isRecovery)
- throws IOException {
- return writeToBlock(b, isRecovery);
+ public synchronized ReplicaInPipelineInterface writeToRbw(Block b,
+ boolean isRecovery) throws IOException {
+ if (isValidBlock(b)) {
+ throw new BlockAlreadyExistsException("Block " + b
+ + " is valid, and cannot be written to.");
+ }
+ BInfo binfo = blockMap.get(b);
+ if (isRecovery && binfo != null) {
+ return binfo;
+ }
+ binfo = new BInfo(b, true);
+ blockMap.put(b, binfo);
+ return binfo;
}
@Override
- public synchronized BlockWriteStreams writeToTemporary(Block b)
+ public synchronized ReplicaInPipelineInterface writeToTemporary(Block b)
throws IOException {
- return writeToBlock(b, false);
- }
-
- private synchronized BlockWriteStreams writeToBlock(Block b,
- boolean isRecovery)
- throws IOException {
if (isValidBlock(b)) {
throw new BlockAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");
@@ -408,10 +478,9 @@
throw new BlockAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
- BInfo binfo = new BInfo(b, true);
- blockMap.put(b, binfo);
- SimulatedOutputStream crcStream = new SimulatedOutputStream();
- return new BlockWriteStreams(binfo.oStream, crcStream);
+ BInfo binfo = new BInfo(b, true);
+ blockMap.put(b, binfo);
+ return binfo;
}
public synchronized InputStream getBlockInputStream(Block b)
@@ -500,7 +569,7 @@
if (binfo == null) {
throw new IOException("No such Block " + b );
}
- return binfo.getlength();
+ return binfo.getNumBytes();
}
public synchronized void setChannelPosition(Block b, BlockWriteStreams stream,
@@ -510,7 +579,7 @@
if (binfo == null) {
throw new IOException("No such Block " + b );
}
- binfo.setlength(dataOffset);
+ binfo.setBytesOnDisk(dataOffset);
}
/**
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Wed Sep 2 04:16:51 2009
@@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -62,14 +63,19 @@
int bytesAdded = 0;
for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
- OutputStream dataOut = fsdataset.writeToRbw(b, false).dataOut;
- assertEquals(0, fsdataset.getLength(b));
- for (int j=1; j <= blockIdToLen(i); ++j) {
- dataOut.write(j);
- assertEquals(j, fsdataset.getLength(b)); // correct length even as we write
- bytesAdded++;
+ ReplicaInPipelineInterface bInfo = fsdataset.writeToRbw(b, false);
+ BlockWriteStreams out = bInfo.createStreams();
+ try {
+ OutputStream dataOut = out.dataOut;
+ assertEquals(0, fsdataset.getLength(b));
+ for (int j=1; j <= blockIdToLen(i); ++j) {
+ dataOut.write(j);
+ assertEquals(j, bInfo.getBytesOnDisk()); // correct length even as we write
+ bytesAdded++;
+ }
+ } finally {
+ out.close();
}
- dataOut.close();
b.setNumBytes(blockIdToLen(i));
fsdataset.finalizeBlock(b);
assertEquals(blockIdToLen(i), fsdataset.getLength(b));
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Wed Sep 2 04:16:51 2009
@@ -120,10 +120,10 @@
}
private void testAppend(FSDataset dataSet) throws IOException {
- dataSet.append(blocks[FINALIZED]).close(); // successful
+ dataSet.append(blocks[FINALIZED]); // successful
try {
- dataSet.append(blocks[TEMPORARY]).close();
+ dataSet.append(blocks[TEMPORARY]);
Assert.fail("Should not have appended to a temporary replica "
+ blocks[TEMPORARY]);
} catch (BlockNotFoundException e) {
@@ -132,7 +132,7 @@
}
try {
- dataSet.append(blocks[RBW]).close();
+ dataSet.append(blocks[RBW]);
Assert.fail("Should not have appended to an RBW replica" + blocks[RBW]);
} catch (BlockNotFoundException e) {
Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
@@ -140,7 +140,7 @@
}
try {
- dataSet.append(blocks[RWR]).close();
+ dataSet.append(blocks[RWR]);
Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
} catch (BlockNotFoundException e) {
Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
@@ -148,7 +148,7 @@
}
try {
- dataSet.append(blocks[RUR]).close();
+ dataSet.append(blocks[RUR]);
Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
} catch (BlockNotFoundException e) {
Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
@@ -156,7 +156,7 @@
}
try {
- dataSet.append(blocks[NON_EXISTENT]).close();
+ dataSet.append(blocks[NON_EXISTENT]);
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (BlockNotFoundException e) {
@@ -167,7 +167,7 @@
private void testWriteToRbw(FSDataset dataSet) throws IOException {
try {
- dataSet.writeToRbw(blocks[FINALIZED], true).close();
+ dataSet.writeToRbw(blocks[FINALIZED], true);
Assert.fail("Should not have recovered a finalized replica " +
blocks[FINALIZED]);
} catch (BlockNotFoundException e) {
@@ -176,14 +176,14 @@
}
try {
- dataSet.writeToRbw(blocks[FINALIZED], false).close();
+ dataSet.writeToRbw(blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToRbw(blocks[TEMPORARY], true).close();
+ dataSet.writeToRbw(blocks[TEMPORARY], true);
Assert.fail("Should not have recovered a temporary replica " +
blocks[TEMPORARY]);
} catch (BlockNotFoundException e) {
@@ -192,23 +192,23 @@
}
try {
- dataSet.writeToRbw(blocks[TEMPORARY], false).close();
+ dataSet.writeToRbw(blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (BlockAlreadyExistsException e) {
}
- dataSet.writeToRbw(blocks[RBW], true).close(); // expect to be successful
+ dataSet.writeToRbw(blocks[RBW], true); // expect to be successful
try {
- dataSet.writeToRbw(blocks[RBW], false).close();
+ dataSet.writeToRbw(blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToRbw(blocks[RWR], true).close();
+ dataSet.writeToRbw(blocks[RWR], true);
Assert.fail("Should not have recovered a RWR replica " + blocks[RWR]);
} catch (BlockNotFoundException e) {
Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA +
@@ -216,14 +216,14 @@
}
try {
- dataSet.writeToRbw(blocks[RWR], false).close();
+ dataSet.writeToRbw(blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToRbw(blocks[RUR], true).close();
+ dataSet.writeToRbw(blocks[RUR], true);
Assert.fail("Should not have recovered a RUR replica " + blocks[RUR]);
} catch (BlockNotFoundException e) {
Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA +
@@ -231,58 +231,58 @@
}
try {
- dataSet.writeToRbw(blocks[RUR], false).close();
+ dataSet.writeToRbw(blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (BlockAlreadyExistsException e) {
}
- dataSet.writeToRbw(blocks[NON_EXISTENT], true).close();
+ dataSet.writeToRbw(blocks[NON_EXISTENT], true);
// remove this replica
ReplicaInfo removedReplica = dataSet.volumeMap.remove(blocks[NON_EXISTENT]);
removedReplica.getBlockFile().delete();
removedReplica.getMetaFile().delete();
- dataSet.writeToRbw(blocks[NON_EXISTENT], false).close();
+ dataSet.writeToRbw(blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FSDataset dataSet) throws IOException {
try {
- dataSet.writeToTemporary(blocks[FINALIZED]).close();
+ dataSet.writeToTemporary(blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToTemporary(blocks[TEMPORARY]).close();
+ dataSet.writeToTemporary(blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToTemporary(blocks[RBW]).close();
+ dataSet.writeToTemporary(blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToTemporary(blocks[RWR]).close();
+ dataSet.writeToTemporary(blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (BlockAlreadyExistsException e) {
}
try {
- dataSet.writeToTemporary(blocks[RUR]).close();
+ dataSet.writeToTemporary(blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (BlockAlreadyExistsException e) {
}
- dataSet.writeToTemporary(blocks[NON_EXISTENT]).close();
+ dataSet.writeToTemporary(blocks[NON_EXISTENT]);
}
}