You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by ed...@apache.org on 2008/07/22 01:26:38 UTC

svn commit: r678596 - /mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java

Author: edeoliveira
Date: Mon Jul 21 16:26:38 2008
New Revision: 678596

URL: http://svn.apache.org/viewvc?rev=678596&view=rev
Log:
DIRMINA-519 enhanced concurrency / fixed write method

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678596&r1=678595&r2=678596&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java Mon Jul 21 16:26:38 2008
@@ -20,8 +20,8 @@
 package org.apache.mina.filter.buffer;
 
 import java.io.BufferedOutputStream;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.filterchain.IoFilter;
@@ -47,8 +47,9 @@
  * @since MINA 2.0.0-M2
  */
 public final class BufferedWriteFilter extends IoFilterAdapter {
-    private final Logger logger = LoggerFactory.getLogger(BufferedWriteFilter.class);
-    
+    private final Logger logger = LoggerFactory
+            .getLogger(BufferedWriteFilter.class);
+
     /**
      * Default buffer size value in bytes.
      */
@@ -63,7 +64,7 @@
      * The map that matches an {@link IoSession} and it's {@link IoBuffer}
      * buffer.
      */
-    protected Map<IoSession, IoBuffer> buffersMap = new HashMap<IoSession, IoBuffer>();
+    private final ConcurrentMap<IoSession, IoBuffer> buffersMap;
 
     /**
      * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
@@ -81,6 +82,7 @@
     public BufferedWriteFilter(int bufferSize) {
         super();
         this.bufferSize = bufferSize;
+        buffersMap = new ConcurrentHashMap<IoSession, IoBuffer>();
     }
 
     /**
@@ -126,14 +128,10 @@
      * @param data the data to buffer
      */
     private void write(IoSession session, IoBuffer data) {
-        IoBuffer dest = null;
-        synchronized (buffersMap) {
+        IoBuffer dest = buffersMap.get(session);
+        if (dest == null) {
+            buffersMap.putIfAbsent(session, IoBuffer.allocate(bufferSize));
             dest = buffersMap.get(session);
-            if (dest == null) {
-                // Enforce the creation of a non-expandable buffer
-                dest = IoBuffer.allocate(bufferSize).setAutoExpand(false);
-                buffersMap.put(session, dest);
-            }
         }
 
         write(session, data, dest);
@@ -150,29 +148,28 @@
      * @param buf the buffer where data will be temporarily written 
      */
     private void write(IoSession session, IoBuffer data, IoBuffer buf) {
-        synchronized (buf) {
-            try {
-                int len = data.remaining();
-                if (len >= buf.capacity()) {
-                    /*
-                     * If the request length exceeds the size of the output buffer,
-                     * flush the output buffer and then write the data directly.
-                     */
-                    NextFilter nextFilter = session.getFilterChain()
-                            .getNextFilter(this);
-                    internalFlush(nextFilter, session, buf);
-                    nextFilter.filterWrite(session,
-                            new DefaultWriteRequest(buf));
-                    return;
-                }
-                if (len > (buf.limit() - buf.position())) {
-                    internalFlush(session.getFilterChain().getNextFilter(this),
-                            session, buf);
-                }
+        try {
+            int len = data.remaining();
+            if (len >= buf.capacity()) {
+                /*
+                 * If the request length exceeds the size of the output buffer,
+                 * flush the output buffer and then write the data directly.
+                 */
+                NextFilter nextFilter = session.getFilterChain().getNextFilter(
+                        this);
+                internalFlush(nextFilter, session, buf);
+                nextFilter.filterWrite(session, new DefaultWriteRequest(data));
+                return;
+            }
+            if (len > (buf.limit() - buf.position())) {
+                internalFlush(session.getFilterChain().getNextFilter(this),
+                        session, buf);
+            }
+            synchronized (buf) {
                 buf.put(data);
-            } catch (Throwable e) {
-                session.getFilterChain().fireExceptionCaught(e);
             }
+        } catch (Throwable e) {
+            session.getFilterChain().fireExceptionCaught(e);
         }
     }
 
@@ -181,16 +178,17 @@
      * 
      * @param nextFilter the {@link NextFilter} of this filter
      * @param session the session where buffer will be written
-     * @param data the data to write
+     * @param buf the data to write
      * @throws Exception if a write operation fails
      */
     private void internalFlush(NextFilter nextFilter, IoSession session,
-            IoBuffer data) throws Exception {
-        if (data != null) {
-            data.flip();
-            logger.debug("Flushing buffer: {}", data);            
-            nextFilter.filterWrite(session, new DefaultWriteRequest(data.duplicate()));
-            data.clear();
+            IoBuffer buf) throws Exception {
+        synchronized (buf) {
+            buf.flip();
+            logger.debug("Flushing buffer: {}", buf);
+            nextFilter.filterWrite(session, new DefaultWriteRequest(buf
+                    .duplicate()));
+            buf.clear();
         }
     }
 
@@ -201,13 +199,8 @@
      */
     public void flush(IoSession session) {
         try {
-            IoBuffer data = null;
-            synchronized (session) {
-                data = buffersMap.get(session);
-            }
             internalFlush(session.getFilterChain().getNextFilter(this),
-                    session, data);
-
+                    session, buffersMap.get(session));
         } catch (Throwable e) {
             session.getFilterChain().fireExceptionCaught(e);
         }
@@ -219,12 +212,10 @@
      * 
      * @param session the session we operate on
      */
-    private void clear(IoSession session) {
-        synchronized (session) {
-            IoBuffer buf = buffersMap.remove(session);
-            if (buf != null) {
-                buf.free();
-            }
+    private void free(IoSession session) {
+        IoBuffer buf = buffersMap.remove(session);
+        if (buf != null) {
+            buf.free();
         }
     }
 
@@ -234,7 +225,7 @@
     @Override
     public void exceptionCaught(NextFilter nextFilter, IoSession session,
             Throwable cause) throws Exception {
-        clear(session);
+        free(session);
     }
 
     /**
@@ -243,6 +234,6 @@
     @Override
     public void sessionClosed(NextFilter nextFilter, IoSession session)
             throws Exception {
-        clear(session);
+        free(session);
     }
 }
\ No newline at end of file