You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/09/26 18:58:52 UTC
svn commit: r579716 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: dhruba
Date: Wed Sep 26 09:58:48 2007
New Revision: 579716
URL: http://svn.apache.org/viewvc?rev=579716&view=rev
Log:
HADOOP-1908. Restructure data node code so that block sending and
receiving are seperated from data transfer header handling.
(Hairong Kuang via dhruba)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 26 09:58:48 2007
@@ -193,6 +193,10 @@
IMPROVEMENTS
+ HADOOP-1908. Restructure data node code so that block sending and
+ receiving are seperated from data transfer header handling.
+ (Hairong Kuang via dhruba)
+
HADOOP-1921. Save the configuration of completed/failed jobs and make them
available via the web-ui. (Amar Kamat via devaraj)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Sep 26 09:58:48 2007
@@ -1619,24 +1619,20 @@
try {
- while ( bytesLeft >= 0 ) {
+ while ( bytesLeft > 0 ) {
int len = (int) Math.min( bytesLeft, bytesPerChecksum );
- if ( len > 0 ) {
- IOUtils.readFully( in, buf, 0, len + checksumSize);
- }
+ IOUtils.readFully( in, buf, 0, len + checksumSize);
blockStream.writeInt( len );
blockStream.write( buf, 0, len + checksumSize );
- if ( bytesLeft == 0 ) {
- break;
- }
-
bytesLeft -= len;
if (progress != null) { progress.progress(); }
}
-
+
+ // write 0 to mark the end of a block
+ blockStream.writeInt(0);
blockStream.flush();
numSuccessfulWrites++;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Sep 26 09:58:48 2007
@@ -744,9 +744,10 @@
* Read/write data from/to the DataXceiveServer.
*/
public void run() {
+ DataInputStream in=null;
try {
- DataInputStream in = new DataInputStream(
- new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
+ in = new DataInputStream(
+ new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
short version = in.readShort();
if ( version != DATA_TRANFER_VERSION ) {
throw new IOException( "Version Mismatch" );
@@ -770,12 +771,10 @@
} catch (Throwable t) {
LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
} finally {
- try {
- xceiverCount.decr();
- LOG.debug("Number of active connections is: "+xceiverCount);
- s.close();
- } catch (IOException ie2) {
- }
+ xceiverCount.decr();
+ LOG.debug("Number of active connections is: "+xceiverCount);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(s);
}
}
@@ -793,11 +792,23 @@
long startOffset = in.readLong();
long length = in.readLong();
-
+
+ // send the block
+ DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE));
+ BlockSender blockSender = null;
try {
- //XXX Buffered output stream?
- long read = sendBlock(s, block, startOffset, length, null );
- myMetrics.readBytes((int)read);
+ try {
+ blockSender = new BlockSender(block, startOffset, length, true, true);
+ } catch(IOException e) {
+ out.writeShort(OP_STATUS_ERROR);
+ throw e;
+ }
+
+ out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
+ long read = blockSender.sendBlock(out, null); // send data
+
+ myMetrics.readBytes((int) read);
myMetrics.readBlocks(1);
LOG.info("Served block " + block + " to " + s.getInetAddress());
} catch ( SocketException ignored ) {
@@ -808,14 +819,18 @@
* Earlier version shutdown() datanode if there is disk error.
*/
LOG.warn( "Got exception while serving " + block + " to " +
- s.getInetAddress() + ": " +
+ s.getInetAddress() + ":\n" +
StringUtils.stringifyException(ioe) );
throw ioe;
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(blockSender);
}
}
/**
* Write a block to disk.
+ *
* @param in The stream to read from
* @throws IOException
*/
@@ -823,62 +838,45 @@
//
// Read in the header
//
- DataOutputStream reply = new DataOutputStream(s.getOutputStream());
- DataOutputStream out = null;
- DataOutputStream checksumOut = null;
- Socket mirrorSock = null;
- DataOutputStream mirrorOut = null;
- DataInputStream mirrorIn = null;
-
+ Block block = new Block(in.readLong(), 0);
+ int numTargets = in.readInt();
+ if (numTargets < 0) {
+ throw new IOException("Mislabelled incoming datastream.");
+ }
+ DatanodeInfo targets[] = new DatanodeInfo[numTargets];
+ for (int i = 0; i < targets.length; i++) {
+ DatanodeInfo tmp = new DatanodeInfo();
+ tmp.readFields(in);
+ targets[i] = tmp;
+ }
+
+ short opStatus = OP_STATUS_SUCCESS; // write operation status
+ DataOutputStream mirrorOut = null; // stream to next target
+ Socket mirrorSock = null; // socket to next target
+ BlockReceiver blockReceiver = null; // responsible for data handling
try {
- /* We need an estimate for block size to check if the
- * disk partition has enough space. For now we just increment
- * FSDataset.reserved by configured dfs.block.size
- * Other alternative is to include the block size in the header
- * sent by DFSClient.
- */
- Block block = new Block( in.readLong(), 0 );
- int numTargets = in.readInt();
- if ( numTargets < 0 ) {
- throw new IOException("Mislabelled incoming datastream.");
- }
- DatanodeInfo targets[] = new DatanodeInfo[numTargets];
- for (int i = 0; i < targets.length; i++) {
- DatanodeInfo tmp = new DatanodeInfo();
- tmp.readFields(in);
- targets[i] = tmp;
- }
-
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
+ // open a block receiver and check if the block does not exist
+ blockReceiver = new BlockReceiver(block, in,
+ s.getRemoteSocketAddress().toString());
//
- // Open local disk out
- //
- FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
- out = new DataOutputStream(
- new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));
- checksumOut = new DataOutputStream(
- new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
-
- InetSocketAddress mirrorTarget = null;
- String mirrorNode = null;
- //
// Open network conn to backup machine, if
// appropriate
//
if (targets.length > 0) {
+ InetSocketAddress mirrorTarget = null;
+ String mirrorNode = null;
// Connect to backup machine
mirrorNode = targets[0].getName();
mirrorTarget = createSocketAddr(mirrorNode);
+ mirrorSock = new Socket();
try {
- mirrorSock = new Socket();
mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
- mirrorSock.setSoTimeout(READ_TIMEOUT);
+ mirrorSock.setSoTimeout(numTargets*READ_TIMEOUT);
mirrorOut = new DataOutputStream(
new BufferedOutputStream(mirrorSock.getOutputStream(),
BUFFER_SIZE));
- mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
- //Copied from DFSClient.java!
+ // Write header: Copied from DFSClient.java!
mirrorOut.writeShort( DATA_TRANFER_VERSION );
mirrorOut.write( OP_WRITE_BLOCK );
mirrorOut.writeLong( block.getBlockId() );
@@ -886,196 +884,73 @@
for ( int i = 1; i < targets.length; i++ ) {
targets[i].write( mirrorOut );
}
- checksum.writeHeader( mirrorOut );
- myMetrics.replicatedBlocks(1);
- } catch (IOException ie) {
- if (mirrorOut != null) {
- LOG.info("Exception connecting to mirror " + mirrorNode
- + "\n" + StringUtils.stringifyException(ie));
- mirrorOut = null;
- }
+ } catch (IOException e) {
+ IOUtils.closeStream(mirrorOut);
+ mirrorOut = null;
+ IOUtils.closeSocket(mirrorSock);
+ mirrorSock = null;
}
}
-
- // XXX The following code is similar on both sides...
-
- int bytesPerChecksum = checksum.getBytesPerChecksum();
- int checksumSize = checksum.getChecksumSize();
- byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
- long blockLen = 0;
- long lastOffset = 0;
- long lastLen = 0;
- short status = -1;
- boolean headerWritten = false;
-
- while ( true ) {
- // Read one data chunk in each loop.
-
- long offset = lastOffset + lastLen;
- int len = in.readInt();
- if ( len < 0 || len > bytesPerChecksum ) {
- LOG.warn( "Got wrong length during writeBlock(" +
- block + ") from " + s.getRemoteSocketAddress() +
- " at offset " + offset + ": " + len +
- " expected <= " + bytesPerChecksum );
- status = OP_STATUS_ERROR;
- break;
- }
- in.readFully( buf, 0, len + checksumSize );
-
- if ( len > 0 && checksumSize > 0 ) {
- /*
- * Verification is not included in the initial design.
- * For now, it at least catches some bugs. Later, we can
- * include this after showing that it does not affect
- * performance much.
- */
- checksum.update( buf, 0, len );
-
- if ( ! checksum.compare( buf, len ) ) {
- throw new IOException( "Unexpected checksum mismatch " +
- "while writing " + block +
- " from " +
- s.getRemoteSocketAddress() );
- }
-
- checksum.reset();
- }
+ String mirrorAddr = (mirrorSock == null) ? null :
+ mirrorSock.getRemoteSocketAddress().toString();
+ blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
+
+ /*
+ * Informing the name node could take a long long time! Should we wait
+ * till namenode is informed before responding with success to the
+ * client? For now we don't.
+ */
+ synchronized (receivedBlockList) {
+ receivedBlockList.add(block);
+ receivedBlockList.notifyAll();
+ }
- // First write to remote node before writing locally.
- if (mirrorOut != null) {
- try {
- mirrorOut.writeInt( len );
- mirrorOut.write( buf, 0, len + checksumSize );
- if (len == 0) {
- mirrorOut.flush();
- }
- } catch (IOException ioe) {
- LOG.info( "Exception writing to mirror " + mirrorNode +
- "\n" + StringUtils.stringifyException(ioe) );
- //
- // If stream-copy fails, continue
- // writing to disk. We shouldn't
- // interrupt client write.
- //
- mirrorOut = null;
- }
- }
+ String msg = "Received block " + block + " from " +
+ s.getRemoteSocketAddress();
+ /* read response from next target in the pipeline.
+ * ignore the response for now. Will fix it in HADOOP-1927
+ */
+ if( mirrorSock != null ) {
+ short result = OP_STATUS_ERROR;
+ DataInputStream mirrorIn = null;
try {
- if ( !headerWritten ) {
- // First DATA_CHUNK.
- // Write the header even if checksumSize is 0.
- checksumOut.writeShort( FSDataset.METADATA_VERSION );
- checksum.writeHeader( checksumOut );
- headerWritten = true;
- }
-
- if ( len > 0 ) {
- out.write( buf, 0, len );
- // Write checksum
- checksumOut.write( buf, len, checksumSize );
- myMetrics.wroteBytes( len );
- } else {
- /* Should we sync() files here? It can add many millisecs of
- * latency. We did not sync before HADOOP-1134 either.
- */
- out.close();
- out = null;
- checksumOut.close();
- checksumOut = null;
- }
-
- } catch (IOException iex) {
- checkDiskError(iex);
- throw iex;
+ mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
+ result = mirrorIn.readShort();
+ } catch (IOException ignored) {
+ } finally {
+ IOUtils.closeStream(mirrorIn);
}
-
- if ( len == 0 ) {
-
- // We already have one successful write here. Should we
- // wait for response from next target? We will skip for now.
- block.setNumBytes( blockLen );
-
- //Does this fsync()?
- data.finalizeBlock( block );
- myMetrics.wroteBlocks(1);
-
- status = OP_STATUS_SUCCESS;
-
- break;
- }
-
- if ( lastLen > 0 && lastLen != bytesPerChecksum ) {
- LOG.warn( "Got wrong length during writeBlock(" +
- block + ") from " + s.getRemoteSocketAddress() +
- " : " + " got " + lastLen + " instead of " +
- bytesPerChecksum );
- status = OP_STATUS_ERROR;
- break;
- }
-
- lastOffset = offset;
- lastLen = len;
- blockLen += len;
- }
- // done with reading the data.
-
- if ( status == OP_STATUS_SUCCESS ) {
- /* Informing the name node could take a long long time!
- Should we wait till namenode is informed before responding
- with success to the client? For now we don't.
- */
- synchronized ( receivedBlockList ) {
- receivedBlockList.add( block );
- receivedBlockList.notifyAll();
- }
-
- String msg = "Received block " + block + " from " +
- s.getInetAddress();
-
- if ( mirrorOut != null ) {
- //Wait for the remote reply
- mirrorOut.flush();
- short result = OP_STATUS_ERROR;
- try {
- result = mirrorIn.readShort();
- } catch ( IOException ignored ) {}
-
- msg += " and " + (( result != OP_STATUS_SUCCESS ) ?
- "failed to mirror to " : " mirrored to ") +
- mirrorTarget;
-
- mirrorOut = null;
- }
-
- LOG.info(msg);
- }
-
- if ( status >= 0 ) {
- try {
- reply.writeShort( status );
- reply.flush();
- } catch ( IOException ignored ) {}
+ msg += " and " + (( result != OP_STATUS_SUCCESS ) ?
+ "failed to mirror to " : " mirrored to ") +
+ mirrorAddr;
}
-
+
+ LOG.info(msg);
+ } catch (IOException ioe) {
+ opStatus = OP_STATUS_ERROR;
+ throw ioe;
} finally {
+ // send back reply
+ DataOutputStream reply = new DataOutputStream(s.getOutputStream());
try {
- if ( out != null )
- out.close();
- if ( checksumOut != null )
- checksumOut.close();
- if ( mirrorSock != null )
- mirrorSock.close();
- } catch (IOException iex) {
- shutdown();
- throw iex;
- }
+ reply.writeShort(opStatus);
+ reply.flush();
+ } catch (IOException ioe) {
+ LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()
+ + "for writing block " + block );
+ LOG.warn(StringUtils.stringifyException(ioe));
+ }
+ // close all opened streams
+ IOUtils.closeStream(reply);
+ IOUtils.closeStream(mirrorOut);
+ IOUtils.closeSocket(mirrorSock);
+ IOUtils.closeStream(blockReceiver);
}
}
-
+
/**
* Reads the metadata and sends the data in one 'DATA_CHUNK'
* @param in
@@ -1113,173 +988,401 @@
}
}
}
+
+ /* An interface to throttle the block transfers */
+ private interface Throttler {
+ /** Given the numOfBytes sent/received since last time throttle was called,
+ * make the current thread sleep if I/O rate is too fast
+ * compared to the given bandwidth
+ *
+ * @param numOfBytes
+ * number of bytes sent/received since last time throttle was called
+ */
+ void throttle(int numOfBytes);
+ }
- /** sendBlock() is used to read block and its metadata and stream
- * the data to either a client or to another datanode.
- * If argument targets is null, then it is assumed to be replying
- * to a client request (OP_BLOCK_READ). Otherwise, we are replicating
- * to another datanode.
- *
- * returns total bytes reads, including crc.
- */
- long sendBlock(Socket sock, Block block,
- long startOffset, long length, DatanodeInfo targets[] )
- throws IOException {
- DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(sock.getOutputStream(),
- BUFFER_SIZE));
- RandomAccessFile blockInFile = null;
- DataInputStream blockIn = null;
- DataInputStream checksumIn = null;
- long totalRead = 0;
-
- /* XXX This will affect inter datanode transfers during
- * a CRC upgrade. There should not be any replication
- * during crc upgrade since we are in safe mode, right?
- */
- boolean corruptChecksumOk = targets == null;
-
- try {
- File blockFile = data.getBlockFile( block );
- blockInFile = new RandomAccessFile(blockFile, "r");
-
- File checksumFile = FSDataset.getMetaFile( blockFile );
- DataChecksum checksum = null;
-
- if ( !corruptChecksumOk || checksumFile.exists() ) {
- checksumIn = new DataInputStream(
- new BufferedInputStream(new FileInputStream(checksumFile),
- BUFFER_SIZE));
-
- //read and handle the common header here. For now just a version
- short version = checksumIn.readShort();
- if ( version != FSDataset.METADATA_VERSION ) {
- LOG.warn( "Wrong version (" + version +
- ") for metadata file for " + block + " ignoring ..." );
- }
- checksum = DataChecksum.newDataChecksum( checksumIn ) ;
- } else {
- LOG.warn( "Could not find metadata file for " + block );
- // This only decides the buffer size. Use BUFFER_SIZE?
- checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
- 16*1024 );
- }
+ private class BlockSender implements java.io.Closeable {
+ private Block block; // the block to read from
+ private DataInputStream blockIn; // data strean
+ private DataInputStream checksumIn; // checksum datastream
+ private DataChecksum checksum; // checksum stream
+ private long offset; // starting position to read
+ private long endOffset; // ending position
+ private byte buf[]; // buffer to store data read from the block file & crc
+ private int bytesPerChecksum; // chunk size
+ private int checksumSize; // checksum size
+ private boolean corruptChecksumOk; // if need to verify checksum
+ private boolean chunkOffsetOK; // if need to send chunk offset
+
+ private Throttler throttler;
+ private DataOutputStream out;
+
+ BlockSender(Block block, long startOffset, long length,
+ boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException {
+ RandomAccessFile blockInFile = null;
- int bytesPerChecksum = checksum.getBytesPerChecksum();
- int checksumSize = checksum.getChecksumSize();
-
- if (length < 0) {
- length = data.getLength(block);
- }
+ try {
+ this.block = block;
+ this.chunkOffsetOK = chunkOffsetOK;
+ this.corruptChecksumOk = corruptChecksumOk;
+ File blockFile = data.getBlockFile(block);
+ blockInFile = new RandomAccessFile(blockFile, "r");
+
+ File checksumFile = FSDataset.getMetaFile(blockFile);
+
+ if (!corruptChecksumOk || checksumFile.exists()) {
+ checksumIn = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(checksumFile), BUFFER_SIZE));
+
+ // read and handle the common header here. For now just a version
+ short version = checksumIn.readShort();
+ if (version != FSDataset.METADATA_VERSION) {
+ LOG.warn("Wrong version (" + version + ") for metadata file for "
+ + block + " ignoring ...");
+ }
+ checksum = DataChecksum.newDataChecksum(checksumIn);
+ } else {
+ LOG.warn("Could not find metadata file for " + block);
+ // This only decides the buffer size. Use BUFFER_SIZE?
+ checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+ 16 * 1024);
+ }
+
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ checksumSize = checksum.getChecksumSize();
- long endOffset = data.getLength( block );
- if ( startOffset < 0 || startOffset > endOffset ||
- (length + startOffset) > endOffset ) {
- String msg = " Offset " + startOffset + " and length " + length +
- " don't match block " + block + " ( blockLen " +
- endOffset + " )";
- LOG.warn( "sendBlock() : " + msg );
- if ( targets != null ) {
+ if (length < 0) {
+ length = data.getLength(block);
+ }
+
+ endOffset = data.getLength(block);
+ if (startOffset < 0 || startOffset > endOffset
+ || (length + startOffset) > endOffset) {
+ String msg = " Offset " + startOffset + " and length " + length
+ + " don't match block " + block + " ( blockLen " + endOffset + " )";
+ LOG.warn("sendBlock() : " + msg);
throw new IOException(msg);
- } else {
- out.writeShort( OP_STATUS_ERROR_INVALID );
- return totalRead;
}
+
+ buf = new byte[bytesPerChecksum + checksumSize];
+ offset = (startOffset - (startOffset % bytesPerChecksum));
+ if (length >= 0) {
+ // Make sure endOffset points to end of a checksumed chunk.
+ long tmpLen = startOffset + length + (startOffset - offset);
+ if (tmpLen % bytesPerChecksum != 0) {
+ tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+ }
+ if (tmpLen < endOffset) {
+ endOffset = tmpLen;
+ }
+ }
+
+ // seek to the right offsets
+ if (offset > 0) {
+ long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+ blockInFile.seek(offset);
+ if (checksumSkip > 0) {
+ // Should we use seek() for checksum file as well?
+ IOUtils.skipFully(checksumIn, checksumSkip);
+ }
+ }
+
+ blockIn = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(blockInFile.getFD()), BUFFER_SIZE));
+ } catch (IOException ioe) {
+ IOUtils.closeStream(this);
+ IOUtils.closeStream(blockInFile);
+ throw ioe;
}
+ }
- byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
- long offset = (startOffset - (startOffset % bytesPerChecksum));
- if ( length >= 0 ) {
- // Make sure endOffset points to end of a checksumed chunk.
- long tmpLen = startOffset + length + (startOffset - offset);
- if ( tmpLen % bytesPerChecksum != 0 ) {
- tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
+ // close opened files
+ public void close() throws IOException {
+ IOException ioe = null;
+ // close checksum file
+ if(checksumIn!=null) {
+ try {
+ checksumIn.close();
+ } catch (IOException e) {
+ ioe = e;
}
- if ( tmpLen < endOffset ) {
- endOffset = tmpLen;
+ checksumIn = null;
+ }
+ // close data file
+ if(blockIn!=null) {
+ try {
+ blockIn.close();
+ } catch (IOException e) {
+ ioe = e;
}
+ blockIn = null;
}
+ // throw IOException if there is any
+ if(ioe!= null) {
+ throw ioe;
+ }
+ }
+
+ private int sendChunk()
+ throws IOException {
+ int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
+ if (len == 0)
+ return 0;
+ blockIn.readFully(buf, 0, len);
- // seek to the right offsets
- if ( offset > 0 ) {
- long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
- blockInFile.seek(offset);
- if (checksumSkip > 0) {
- //Should we use seek() for checksum file as well?
- IOUtils.skipFully(checksumIn, checksumSkip);
+ if (checksumSize > 0 && checksumIn != null) {
+ try {
+ checksumIn.readFully(buf, len, checksumSize);
+ } catch (IOException e) {
+ LOG.warn(" Could not read checksum for data at offset " + offset
+ + " for block " + block + " got : "
+ + StringUtils.stringifyException(e));
+ IOUtils.closeStream(checksumIn);
+ checksumIn = null;
+ if (corruptChecksumOk) {
+ // Just fill the array with zeros.
+ Arrays.fill(buf, len, len + checksumSize, (byte) 0);
+ } else {
+ throw e;
+ }
}
}
-
- blockIn = new DataInputStream(new BufferedInputStream(
- new FileInputStream(blockInFile.getFD()),
- BUFFER_SIZE));
-
- if ( targets != null ) {
+
+ out.writeInt(len);
+ out.write(buf, 0, len + checksumSize);
+
+ if (throttler != null) { // rebalancing so throttle
+ throttler.throttle(len + checksumSize);
+ }
+
+ return len;
+ }
+
+ /**
+ * sendBlock() is used to read block and its metadata and stream the data to
+ * either a client or to another datanode.
+ *
+ * @param out stream to which the block is written to
+ * returns total bytes reads, including crc.
+ */
+ long sendBlock(DataOutputStream out, Throttler throttler)
+ throws IOException {
+ if( out == null ) {
+ throw new IOException( "out stream is null" );
+ }
+ this.out = out;
+ this.throttler = throttler;
+
+ long totalRead = 0;
+ try {
+ checksum.writeHeader(out);
+ if ( chunkOffsetOK ) {
+ out.writeLong( offset );
+ }
+
+ while (endOffset > offset) {
+ // Write one data chunk per loop.
+ long len = sendChunk();
+ offset += len;
+ totalRead += len + checksumSize;
+ }
+ out.writeInt(0); // mark the end of block
+ out.flush();
+ } finally {
+ close();
+ }
+
+ return totalRead;
+ }
+ }
+
+ /* A class that receives a block and wites to its own disk, meanwhile
+ * may copies it to another site. If a throttler is provided,
+ * streaming throttling is also supported.
+ * */
+ private class BlockReceiver implements java.io.Closeable {
+ private Block block; // the block to receive
+ private DataInputStream in = null; // from where data are read
+ private DataChecksum checksum; // from where chunks of a block can be read
+ private DataOutputStream out = null; // to block file at local disk
+ private DataOutputStream checksumOut = null; // to crc file at local disk
+ private int bytesPerChecksum;
+ private int checksumSize;
+ private byte buf[];
+ private long offset;
+ final private String inAddr;
+ private String mirrorAddr;
+ private DataOutputStream mirrorOut;
+ private Throttler throttler;
+ private int lastLen = -1;
+ private int curLen = -1;
+
+ BlockReceiver(Block block, DataInputStream in, String inAddr)
+ throws IOException {
+ try{
+ this.block = block;
+ this.in = in;
+ this.inAddr = inAddr;
+ this.checksum = DataChecksum.newDataChecksum(in);
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ this.checksumSize = checksum.getChecksumSize();
+ this.buf = new byte[bytesPerChecksum + checksumSize];
+
//
- // Header info
+ // Open local disk out
//
- out.writeShort( DATA_TRANFER_VERSION );
- out.writeByte( OP_WRITE_BLOCK );
- out.writeLong( block.getBlockId() );
- out.writeInt(targets.length-1);
- for (int i = 1; i < targets.length; i++) {
- targets[i].write( out );
+ FSDataset.BlockWriteStreams streams = data.writeToBlock(block);
+ this.out = new DataOutputStream(new BufferedOutputStream(
+ streams.dataOut, BUFFER_SIZE));
+ this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+ streams.checksumOut, BUFFER_SIZE));
+ } catch(IOException ioe) {
+ IOUtils.closeStream(this);
+ throw ioe;
+ }
+ }
+
+ // close files
+ public void close() throws IOException {
+ IOException ioe = null;
+ // close checksum file
+ try {
+ if (checksumOut != null) {
+ checksumOut.close();
+ checksumOut = null;
}
- } else {
- out.writeShort( OP_STATUS_SUCCESS );
+ } catch(IOException e) {
+ ioe = e;
+ }
+ // close block file
+ try {
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ } catch (IOException e) {
+ ioe = e;
+ }
+ // disk check
+ if(ioe != null) {
+ checkDiskError(ioe);
+ throw ioe;
+ }
+ }
+
+ /* receive a chunk: write it to disk & mirror it to another stream */
+ private void receiveChunk( int len ) throws IOException {
+ if (len <= 0 || len > bytesPerChecksum) {
+ throw new IOException("Got wrong length during writeBlock(" + block
+ + ") from " + inAddr + " at offset " + offset + ": " + len
+ + " expected <= " + bytesPerChecksum);
}
- checksum.writeHeader( out );
-
- if ( targets == null ) {
- out.writeLong( offset );
+ if (lastLen > 0 && lastLen != bytesPerChecksum) {
+ throw new IOException("Got wrong length during receiveBlock(" + block
+ + ") from " + inAddr + " : " + " got " + lastLen + " instead of "
+ + bytesPerChecksum);
}
-
- while ( endOffset >= offset ) {
- // Write one data chunk per loop.
- int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
- if ( len > 0 ) {
- blockIn.readFully( buf, 0, len );
- totalRead += len;
-
- if ( checksumSize > 0 && checksumIn != null ) {
- try {
- checksumIn.readFully( buf, len, checksumSize );
- totalRead += checksumSize;
- } catch ( IOException e ) {
- LOG.warn( " Could not read checksum for data at offset " +
- offset + " for block " + block + " got : " +
- StringUtils.stringifyException(e) );
- IOUtils.closeStream( checksumIn );
- checksumIn = null;
- if ( corruptChecksumOk ) {
- // Just fill the array with zeros.
- Arrays.fill( buf, len, len + checksumSize, (byte)0 );
- } else {
- throw e;
- }
- }
- }
- }
- out.writeInt( len );
- out.write( buf, 0, len + checksumSize );
-
- if ( offset == endOffset ) {
- out.flush();
- // We are not waiting for response from target.
- break;
+ lastLen = curLen;
+ curLen = len;
+
+ in.readFully(buf, 0, len + checksumSize);
+
+ /*
+ * Verification is not included in the initial design. For now, it at
+ * least catches some bugs. Later, we can include this after showing that
+ * it does not affect performance much.
+ */
+ checksum.update(buf, 0, len);
+
+ if (!checksum.compare(buf, len)) {
+ throw new IOException("Unexpected checksum mismatch "
+ + "while writing " + block + " from " + inAddr);
+ }
+
+ checksum.reset();
+
+ // First write to remote node before writing locally.
+ if (mirrorOut != null) {
+ try {
+ mirrorOut.writeInt(len);
+ mirrorOut.write(buf, 0, len + checksumSize);
+ } catch (IOException ioe) {
+ LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
+ + StringUtils.stringifyException(ioe));
+ //
+ // If stream-copy fails, continue
+ // writing to disk. We shouldn't
+ // interrupt client write.
+ //
+ mirrorOut = null;
}
- offset += len;
}
- } finally {
- IOUtils.closeStream( blockInFile );
- IOUtils.closeStream( checksumIn );
- IOUtils.closeStream( blockIn );
- IOUtils.closeStream( out );
+
+ try {
+ out.write(buf, 0, len);
+ // Write checksum
+ checksumOut.write(buf, len, checksumSize);
+ myMetrics.wroteBytes(len);
+ } catch (IOException iex) {
+ checkDiskError(iex);
+ throw iex;
+ }
+
+ if (throttler != null) { // throttle I/O
+ throttler.throttle(len + checksumSize);
+ }
+ }
+
+ public void receiveBlock(DataOutputStream mirrorOut,
+ String mirrorAddr, Throttler throttler) throws IOException {
+
+ this.mirrorOut = mirrorOut;
+ this.mirrorAddr = mirrorAddr;
+ this.throttler = throttler;
+
+ /*
+ * We need an estimate for block size to check if the disk partition has
+ * enough space. For now we just increment FSDataset.reserved by
+ * configured dfs.block.size Other alternative is to include the block
+ * size in the header sent by DFSClient.
+ */
+
+ try {
+ // write data chunk header
+ checksumOut.writeShort(FSDataset.METADATA_VERSION);
+ checksum.writeHeader(checksumOut);
+ if (mirrorOut != null) {
+ checksum.writeHeader(mirrorOut);
+ this.mirrorAddr = mirrorAddr;
+ }
+
+ int len = in.readInt();
+ while (len != 0) {
+ receiveChunk( len );
+ offset += len;
+ len = in.readInt();
+ }
+
+ // flush the mirror out
+ if (mirrorOut != null) {
+ mirrorOut.writeInt(0); // mark the end of the stream
+ mirrorOut.flush();
+ }
+
+ // close the block/crc files
+ close();
+
+ // Finalize the block. Does this fsync()?
+ block.setNumBytes(offset);
+ data.finalizeBlock(block);
+ myMetrics.wroteBlocks(1);
+ } catch (IOException ioe) {
+ IOUtils.closeStream(this);
+ throw ioe;
+ }
}
-
- return totalRead;
}
/**
@@ -1305,21 +1408,40 @@
public void run() {
xmitsInProgress++;
Socket sock = null;
+ DataOutputStream out = null;
+ BlockSender blockSender = null;
try {
InetSocketAddress curTarget =
createSocketAddr(targets[0].getName());
sock = new Socket();
sock.connect(curTarget, READ_TIMEOUT);
- sock.setSoTimeout(READ_TIMEOUT);
- sendBlock( sock, b, 0, -1, targets );
- LOG.info( "Transmitted block " + b + " to " + curTarget );
-
- } catch ( IOException ie ) {
- LOG.warn( "Failed to transfer " + b + " to " +
- targets[0].getName() + " got " +
- StringUtils.stringifyException( ie ) );
+ sock.setSoTimeout(targets.length*READ_TIMEOUT);
+
+ out = new DataOutputStream(new BufferedOutputStream(
+ sock.getOutputStream(), BUFFER_SIZE));
+ blockSender = new BlockSender(b, 0, -1, false, false);
+
+ //
+ // Header info
+ //
+ out.writeShort(DATA_TRANFER_VERSION);
+ out.writeByte(OP_WRITE_BLOCK);
+ out.writeLong(b.getBlockId());
+ // write targets
+ out.writeInt(targets.length - 1);
+ for (int i = 1; i < targets.length; i++) {
+ targets[i].write(out);
+ }
+ // send data & checksum
+ blockSender.sendBlock(out, null);
+ LOG.info("Transmitted block " + b + " to " + curTarget);
+ } catch (IOException ie) {
+ LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()
+ + " got " + StringUtils.stringifyException(ie));
} finally {
+ IOUtils.closeStream(blockSender);
+ IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
xmitsInProgress--;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Sep 26 09:58:48 2007
@@ -97,7 +97,13 @@
* This should change when serialization of DatanodeInfo, not just
* when protocol changes. It is not very obvious.
*/
- public static final int DATA_TRANFER_VERSION = 5; //Should it be 1?
+ /* Version 6:
+ * 0 marks the end of a block not an EMPTY_CHUNK
+ * OP_READ_BLOCK: return OP_STATUS_ERROR if received an invalid block id
+ * return OP_STATUS_ERROR if received an invalid length
+ * OP_WRITE_BLOCK: return OP_STATUS_ERROR if illegal bytesPerChecksum
+ */
+ public static final int DATA_TRANFER_VERSION = 6; //Should it be 1?
// Return codes for file create
public static final int OPERATION_FAILED = 0;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Wed Sep 26 09:58:48 2007
@@ -171,7 +171,9 @@
byteBuf.put((byte)DataChecksum.CHECKSUM_CRC32);
byteBuf.putInt(-1-random.nextInt(oneMil));
- sendRecvData("wrong bytesPerChecksum while writing", true);
+ recvByteBuf.position(0);
+ recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);
+ sendRecvData("wrong bytesPerChecksum while writing", false);
byteBuf.putInt(checksumPos+1, 512);
byteBuf.putInt(targetPos, -1-random.nextInt(oneMil));
@@ -204,14 +206,11 @@
byteBuf.putLong(0L);
int lenPos = byteBuf.position();
byteBuf.putLong(fileLen);
- /* We should change DataNode to return ERROR_INVALID instead of closing
- * the connection.
- */
- sendRecvData("Wrong block ID for read", true);
+ recvByteBuf.position(0);
+ recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);
+ sendRecvData("Wrong block ID for read", false);
byteBuf.putLong(blockPos, firstBlock.getBlockId());
- recvByteBuf.position(0);
- recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR_INVALID);
byteBuf.putLong(startOffsetPos, -1-random.nextInt(oneMil));
sendRecvData("Negative start-offset for read", false);
@@ -224,7 +223,7 @@
byteBuf.putLong(lenPos, -1-random.nextInt(oneMil));
sendRecvData("Negative length for read", false);
- recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR_INVALID);
+ recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR);
byteBuf.putLong(lenPos, fileLen+1);
sendRecvData("Wrong length for read", false);
byteBuf.putLong(lenPos, fileLen);