You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ed...@apache.org on 2008/07/22 01:26:38 UTC
svn commit: r678596 -
/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
Author: edeoliveira
Date: Mon Jul 21 16:26:38 2008
New Revision: 678596
URL: http://svn.apache.org/viewvc?rev=678596&view=rev
Log:
DIRMINA-519 enhanced concurrency / fixed write method
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678596&r1=678595&r2=678596&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java Mon Jul 21 16:26:38 2008
@@ -20,8 +20,8 @@
package org.apache.mina.filter.buffer;
import java.io.BufferedOutputStream;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilter;
@@ -47,8 +47,9 @@
* @since MINA 2.0.0-M2
*/
public final class BufferedWriteFilter extends IoFilterAdapter {
- private final Logger logger = LoggerFactory.getLogger(BufferedWriteFilter.class);
-
+ private final Logger logger = LoggerFactory
+ .getLogger(BufferedWriteFilter.class);
+
/**
* Default buffer size value in bytes.
*/
@@ -63,7 +64,7 @@
* The map that matches an {@link IoSession} and it's {@link IoBuffer}
* buffer.
*/
- protected Map<IoSession, IoBuffer> buffersMap = new HashMap<IoSession, IoBuffer>();
+ private final ConcurrentMap<IoSession, IoBuffer> buffersMap;
/**
* Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
@@ -81,6 +82,7 @@
public BufferedWriteFilter(int bufferSize) {
super();
this.bufferSize = bufferSize;
+ buffersMap = new ConcurrentHashMap<IoSession, IoBuffer>();
}
/**
@@ -126,14 +128,10 @@
* @param data the data to buffer
*/
private void write(IoSession session, IoBuffer data) {
- IoBuffer dest = null;
- synchronized (buffersMap) {
+ IoBuffer dest = buffersMap.get(session);
+ if (dest == null) {
+ buffersMap.putIfAbsent(session, IoBuffer.allocate(bufferSize));
dest = buffersMap.get(session);
- if (dest == null) {
- // Enforce the creation of a non-expandable buffer
- dest = IoBuffer.allocate(bufferSize).setAutoExpand(false);
- buffersMap.put(session, dest);
- }
}
write(session, data, dest);
@@ -150,29 +148,28 @@
* @param buf the buffer where data will be temporarily written
*/
private void write(IoSession session, IoBuffer data, IoBuffer buf) {
- synchronized (buf) {
- try {
- int len = data.remaining();
- if (len >= buf.capacity()) {
- /*
- * If the request length exceeds the size of the output buffer,
- * flush the output buffer and then write the data directly.
- */
- NextFilter nextFilter = session.getFilterChain()
- .getNextFilter(this);
- internalFlush(nextFilter, session, buf);
- nextFilter.filterWrite(session,
- new DefaultWriteRequest(buf));
- return;
- }
- if (len > (buf.limit() - buf.position())) {
- internalFlush(session.getFilterChain().getNextFilter(this),
- session, buf);
- }
+ try {
+ int len = data.remaining();
+ if (len >= buf.capacity()) {
+ /*
+ * If the request length exceeds the size of the output buffer,
+ * flush the output buffer and then write the data directly.
+ */
+ NextFilter nextFilter = session.getFilterChain().getNextFilter(
+ this);
+ internalFlush(nextFilter, session, buf);
+ nextFilter.filterWrite(session, new DefaultWriteRequest(data));
+ return;
+ }
+ if (len > (buf.limit() - buf.position())) {
+ internalFlush(session.getFilterChain().getNextFilter(this),
+ session, buf);
+ }
+ synchronized (buf) {
buf.put(data);
- } catch (Throwable e) {
- session.getFilterChain().fireExceptionCaught(e);
}
+ } catch (Throwable e) {
+ session.getFilterChain().fireExceptionCaught(e);
}
}
@@ -181,16 +178,17 @@
*
* @param nextFilter the {@link NextFilter} of this filter
* @param session the session where buffer will be written
- * @param data the data to write
+ * @param buf the data to write
* @throws Exception if a write operation fails
*/
private void internalFlush(NextFilter nextFilter, IoSession session,
- IoBuffer data) throws Exception {
- if (data != null) {
- data.flip();
- logger.debug("Flushing buffer: {}", data);
- nextFilter.filterWrite(session, new DefaultWriteRequest(data.duplicate()));
- data.clear();
+ IoBuffer buf) throws Exception {
+ synchronized (buf) {
+ buf.flip();
+ logger.debug("Flushing buffer: {}", buf);
+ nextFilter.filterWrite(session, new DefaultWriteRequest(buf
+ .duplicate()));
+ buf.clear();
}
}
@@ -201,13 +199,8 @@
*/
public void flush(IoSession session) {
try {
- IoBuffer data = null;
- synchronized (session) {
- data = buffersMap.get(session);
- }
internalFlush(session.getFilterChain().getNextFilter(this),
- session, data);
-
+ session, buffersMap.get(session));
} catch (Throwable e) {
session.getFilterChain().fireExceptionCaught(e);
}
@@ -219,12 +212,10 @@
*
* @param session the session we operate on
*/
- private void clear(IoSession session) {
- synchronized (session) {
- IoBuffer buf = buffersMap.remove(session);
- if (buf != null) {
- buf.free();
- }
+ private void free(IoSession session) {
+ IoBuffer buf = buffersMap.remove(session);
+ if (buf != null) {
+ buf.free();
}
}
@@ -234,7 +225,7 @@
@Override
public void exceptionCaught(NextFilter nextFilter, IoSession session,
Throwable cause) throws Exception {
- clear(session);
+ free(session);
}
/**
@@ -243,6 +234,6 @@
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session)
throws Exception {
- clear(session);
+ free(session);
}
}
\ No newline at end of file