You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/09 21:25:31 UTC
svn commit: r473062 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Server.java
src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java
Author: cutting
Date: Thu Nov 9 12:25:30 2006
New Revision: 473062
URL: http://svn.apache.org/viewvc?view=rev&rev=473062
Log:
HADOOP-637. Fix a memory leak in IPC server. Contributed by Raghu.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=473062&r1=473061&r2=473062
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Nov 9 12:25:30 2006
@@ -34,6 +34,10 @@
10. HADOOP-694. Fix a NullPointerException in jobtracker.
(Mahadev Konar via cutting)
+11. HADOOP-637. Fix a memory leak in the IPC server. Direct buffers
+ are not collected like normal buffers, and provided little
+ advantage. (Raghu Angadi via cutting)
+
Release 0.8.0 - 2006-11-03
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=473062&r1=473061&r2=473062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Nov 9 12:25:30 2006
@@ -366,11 +366,11 @@
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
- this.dataLengthBuffer = null;
+ this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.out = new DataOutputStream
(new BufferedOutputStream(
- this.channelOut = new SocketChannelOutputStream(channel, 4096)));
+ this.channelOut = new SocketChannelOutputStream( channel )));
InetAddress addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
@@ -410,31 +410,27 @@
public int readAndProcess() throws IOException, InterruptedException {
int count = -1;
- if (dataLengthBuffer == null)
- dataLengthBuffer = ByteBuffer.allocateDirect(4);
if (dataLengthBuffer.remaining() > 0) {
- count = channel.read(dataLengthBuffer);
- if (count < 0) return count;
- if (dataLengthBuffer.remaining() == 0) {
- dataLengthBuffer.flip();
- dataLength = dataLengthBuffer.getInt();
- data = ByteBuffer.allocateDirect(dataLength);
- }
- //return count;
+ count = channel.read(dataLengthBuffer);
+ if ( count < 0 || dataLengthBuffer.remaining() > 0 )
+ return count;
+ dataLengthBuffer.flip();
+ dataLength = dataLengthBuffer.getInt();
+ data = ByteBuffer.allocate(dataLength);
}
count = channel.read(data);
if (data.remaining() == 0) {
data.flip();
processData();
- data = dataLengthBuffer = null;
+ dataLengthBuffer.flip();
+ data = null;
}
return count;
}
private void processData() throws IOException, InterruptedException {
- byte[] bytes = new byte[dataLength];
- data.get(bytes);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ DataInputStream dis =
+ new DataInputStream(new ByteArrayInputStream( data.array() ));
int id = dis.readInt(); // try to read an id
if (LOG.isDebugEnabled())
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java?view=diff&rev=473062&r1=473061&r2=473062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java Thu Nov 9 12:25:30 2006
@@ -42,10 +42,10 @@
/** Constructor.
*
*/
- public SocketChannelOutputStream(SocketChannel channel, int bufferSize)
+ public SocketChannelOutputStream(SocketChannel channel)
{
this.channel = channel;
- buffer = ByteBuffer.allocateDirect(bufferSize);
+ buffer = ByteBuffer.allocate(8); // only for small writes
}
/* ------------------------------------------------------------------------------- */
@@ -85,16 +85,8 @@
*/
public void write(byte[] buf, int offset, int length) throws IOException
{
- if (length > buffer.capacity())
- flush = ByteBuffer.wrap(buf,offset,length);
- else
- {
- buffer.clear();
- buffer.put(buf,offset,length);
- buffer.flip();
- flush = buffer;
- }
- flushBuffer();
+ flush = ByteBuffer.wrap(buf,offset,length);
+ flushBuffer();
}
/* ------------------------------------------------------------------------------- */
@@ -103,16 +95,8 @@
*/
public void write(byte[] buf) throws IOException
{
- if (buf.length > buffer.capacity())
- flush = ByteBuffer.wrap(buf);
- else
- {
- buffer.clear();
- buffer.put(buf);
- buffer.flip();
- flush = buffer;
- }
- flushBuffer();
+ flush = ByteBuffer.wrap(buf);
+ flushBuffer();
}
@@ -144,6 +128,7 @@
}
}
}
+ flush = null;
}
/* ------------------------------------------------------------------------------- */