You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/08/10 20:19:43 UTC
svn commit: r564704 - in /lucene/hadoop/branches/branch-0.14: CHANGES.txt
src/java/org/apache/hadoop/dfs/DFSClient.java
src/java/org/apache/hadoop/dfs/DataNode.java
src/java/org/apache/hadoop/fs/FileUtil.java
Author: dhruba
Date: Fri Aug 10 11:19:40 2007
New Revision: 564704
URL: http://svn.apache.org/viewvc?view=rev&rev=564704
Log:
Remove performance regression introduced by Block CRC.
(Raghu Angadi via dhruba)
merge -c 564687 from trunk to 0.14 release.
Modified:
lucene/hadoop/branches/branch-0.14/CHANGES.txt
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Fri Aug 10 11:19:40 2007
@@ -457,6 +457,9 @@
146. HADOOP-1666. FsShell object can be used for multiple fs commands.
Contributed by Dhruba Borthakur.
+147. HADOOP-1654. Remove performance regression introduced by Block CRC.
+ (Raghu Angadi via dhruba)
+
Release 0.13.0 - 2007-06-08
1. HADOOP-1047. Fix TestReplication to succeed more reliably.
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Aug 10 11:19:40 2007
@@ -1638,8 +1638,12 @@
int checksumSize = checksum.getChecksumSize();
byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
- InputStream in = (bytesLeft > 0) ?
- new FileInputStream(backupFile) : null;
+ InputStream in = null;
+ if ( bytesLeft > 0 ) {
+ in = new BufferedInputStream(new FileInputStream(backupFile),
+ buffersize);
+ }
+
try {
while ( bytesLeft >= 0 ) {
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java Fri Aug 10 11:19:40 2007
@@ -77,13 +77,6 @@
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
/**
- * A buffer size small enough that read/writes while reading headers
- * don't result in multiple io calls but reading larger amount of data
- * like one checksum size does not result in extra copy.
- */
- public static final int SMALL_HDR_BUFFER_SIZE = 64;
-
- /**
* Util method to build socket addr from either:
* <host>:<post>
* <fs>://<host>:<port>/<path>
@@ -718,7 +711,7 @@
public void run() {
try {
DataInputStream in = new DataInputStream(
- new BufferedInputStream(s.getInputStream(), SMALL_HDR_BUFFER_SIZE));
+ new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
short version = in.readShort();
if ( version != DATA_TRANFER_VERSION ) {
throw new IOException( "Version Mismatch" );
@@ -827,8 +820,10 @@
// Open local disk out
//
FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
- out = new DataOutputStream(streams.dataOut);
- checksumOut = new DataOutputStream(streams.checksumOut);
+ out = new DataOutputStream(
+ new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));
+ checksumOut = new DataOutputStream(
+ new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
InetSocketAddress mirrorTarget = null;
String mirrorNode = null;
@@ -846,7 +841,7 @@
mirrorSock.setSoTimeout(READ_TIMEOUT);
mirrorOut = new DataOutputStream(
new BufferedOutputStream(mirrorSock.getOutputStream(),
- SMALL_HDR_BUFFER_SIZE));
+ BUFFER_SIZE));
mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
//Copied from DFSClient.java!
mirrorOut.writeShort( DATA_TRANFER_VERSION );
@@ -918,6 +913,9 @@
try {
mirrorOut.writeInt( len );
mirrorOut.write( buf, 0, len + checksumSize );
+ if (len == 0) {
+ mirrorOut.flush();
+ }
} catch (IOException ioe) {
LOG.info( "Exception writing to mirror " + mirrorNode +
"\n" + StringUtils.stringifyException(ioe) );
@@ -1092,15 +1090,14 @@
long sendBlock(Socket sock, Block block,
long startOffset, long length, DatanodeInfo targets[] )
throws IOException {
- // May be we should just use io.file.buffer.size.
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(sock.getOutputStream(),
- SMALL_HDR_BUFFER_SIZE));
- DataInputStream in = null;
+ BUFFER_SIZE));
+ RandomAccessFile blockInFile = null;
+ DataInputStream blockIn = null;
DataInputStream checksumIn = null;
long totalRead = 0;
-
/* XXX This will affect inter datanode transfers during
* a CRC upgrade. There should not be any replication
* during crc upgrade since we are in safe mode, right?
@@ -1109,13 +1106,15 @@
try {
File blockFile = data.getBlockFile( block );
- in = new DataInputStream( new FileInputStream( blockFile ) );
+ blockInFile = new RandomAccessFile(blockFile, "r");
File checksumFile = FSDataset.getMetaFile( blockFile );
DataChecksum checksum = null;
if ( !corruptChecksumOk || checksumFile.exists() ) {
- checksumIn = new DataInputStream( new FileInputStream(checksumFile) );
+ checksumIn = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(checksumFile),
+ BUFFER_SIZE));
//read and handle the common header here. For now just a version
short version = checksumIn.readShort();
@@ -1169,17 +1168,17 @@
// seek to the right offsets
if ( offset > 0 ) {
long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
- /* XXX skip() could be very inefficent. Should be seek().
- * at least skipFully
- */
- if ( in.skip( offset ) != offset ||
- ( checksumSkip > 0 &&
- checksumIn.skip( checksumSkip ) != checksumSkip ) ) {
- throw new IOException( "Could not seek to right position while " +
- "reading for " + block );
+ blockInFile.seek(offset);
+ if (checksumSkip > 0) {
+ //Should we use seek() for checksum file as well?
+ FileUtil.skipFully(checksumIn, checksumSkip);
}
}
+ blockIn = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(blockInFile.getFD()),
+ BUFFER_SIZE));
+
if ( targets != null ) {
//
// Header info
@@ -1205,7 +1204,7 @@
// Write one data chunk per loop.
int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
if ( len > 0 ) {
- in.readFully( buf, 0, len );
+ blockIn.readFully( buf, 0, len );
totalRead += len;
if ( checksumSize > 0 && checksumIn != null ) {
@@ -1239,8 +1238,9 @@
offset += len;
}
} finally {
+ FileUtil.closeStream( blockInFile );
FileUtil.closeStream( checksumIn );
- FileUtil.closeStream( in );
+ FileUtil.closeStream( blockIn );
FileUtil.closeStream( out );
}
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java Fri Aug 10 11:19:40 2007
@@ -279,21 +279,6 @@
return dst;
}
- private static File checkDest(String srcName, File dst)
- throws IOException {
- if (dst.exists()) {
- if (!dst.isDirectory()) {
- throw new IOException("Target " + dst + " already exists");
- } else {
- dst = new File(dst, srcName);
- if (dst.exists()) {
- throw new IOException("Target " + dst + " already exists");
- }
- }
- }
- return dst;
- }
-
/**
* This class is only used on windows to invoke the cygpath command.
*/
@@ -532,29 +517,32 @@
}
}
- public static void closeSocket( Socket sock ) {
- // avoids try { close() } dance
- if ( sock != null ) {
- try {
- sock.close();
- } catch ( IOException ignored ) {
+ public static void skipFully( InputStream in, long len ) throws IOException {
+ long toSkip = len;
+ while ( toSkip > 0 ) {
+ long ret = in.skip( toSkip );
+ if ( ret < 0 ) {
+ throw new IOException( "Premeture EOF from inputStream");
}
+ toSkip -= ret;
}
}
- public static void closeStream( InputStream in ) {
+
+ public static void closeSocket( Socket sock ) {
// avoids try { close() } dance
- if ( in != null ) {
+ if ( sock != null ) {
try {
- in.close();
+ sock.close();
} catch ( IOException ignored ) {
}
}
}
- public static void closeStream( OutputStream out ) {
+
+ public static void closeStream(Closeable closeable ) {
// avoids try { close() } dance
- if ( out != null ) {
+ if ( closeable != null ) {
try {
- out.close();
+ closeable.close();
} catch ( IOException ignored ) {
}
}