You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/07/26 11:12:38 UTC

svn commit: r559764 - in /mina: branches/1.0/core/src/main/java/org/apache/mina/common/support/ branches/1.0/core/src/main/java/org/apache/mina/filter/codec/ branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/ branches/1.0/core/src/ma...

Author: trustin
Date: Thu Jul 26 02:12:35 2007
New Revision: 559764

URL: http://svn.apache.org/viewvc?view=rev&rev=559764
Log:
Made sure the queued message in ProtocolCodecFilter is flushed out when resumeRead() is invoked.  To implement this, SocketIoProcessor and DatagramConnectorDelegate fire a messageReceived event with an empty buffer.  I made sure all filters process it correctly.  It's somewhat a kind of hack, but I though it's very easy to implement and affects only IoFilters.



Modified:
    mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java
    mina/branches/1.0/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java
    mina/branches/1.0/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java
    mina/branches/1.1/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java
    mina/branches/1.1/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/ProfilerTimerFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java
    mina/trunk/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java Thu Jul 26 02:12:35 2007
@@ -565,7 +565,9 @@
         public void messageReceived(NextFilter nextFilter, IoSession session,
                 Object message) throws Exception {
             try {
-                session.getHandler().messageReceived(session, message);
+                if (!ByteBufferUtil.isEmpty(message)) {
+                    session.getHandler().messageReceived(session, message);
+                }
             } finally {
                 ByteBufferUtil.releaseIfPossible(message);
             }

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Thu Jul 26 02:12:35 2007
@@ -149,8 +149,10 @@
         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
 
         try {
-            synchronized (decoderOut) {
-                decoder.decode(session, in, decoderOut);
+            if (in.hasRemaining()) {
+                synchronized (decoderOut) {
+                    decoder.decode(session, in, decoderOut);
+                }
             }
         } catch (Throwable t) {
             ProtocolDecoderException pde;

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Thu Jul 26 02:12:35 2007
@@ -29,6 +29,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.util.NamePreservingRunnable;
@@ -455,8 +456,19 @@
             }
 
             // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+            TrafficMask trafficMask = session.getTrafficMask();
+            int opsMask = trafficMask.getInterestOps();
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0 &&
+                    trafficMask.isReadable()) {
+                // This is a somewhat ugly workaround for the case that
+                // ProtocolCodecFilter is in the filter chain.
+                // Firing messageReceived() event with an empty buffer
+                // triggers ProtocolCodecFilter to flush any queued
+                // messageReceived() events on resumeRead().
+                session.getFilterChain().fireMessageReceived(
+                        session, ByteBuffer.wrap(new byte[0]));
+            }
+            key.interestOps(ops & opsMask);
         }
     }
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Thu Jul 26 02:12:35 2007
@@ -36,6 +36,7 @@
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionRecycler;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
 import org.apache.mina.common.support.BaseIoConnector;
@@ -291,8 +292,19 @@
             }
 
             // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+            TrafficMask trafficMask = session.getTrafficMask();
+            int opsMask = trafficMask.getInterestOps();
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0 &&
+                    trafficMask.isReadable()) {
+                // This is a somewhat ugly workaround for the case that
+                // ProtocolCodecFilter is in the filter chain.
+                // Firing messageReceived() event with an empty buffer
+                // triggers ProtocolCodecFilter to flush any queued
+                // messageReceived() events on resumeRead().
+                session.getFilterChain().fireMessageReceived(
+                        session, ByteBuffer.wrap(new byte[0]));
+            }
+            key.interestOps(ops & opsMask);
         }
     }
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java Thu Jul 26 02:12:35 2007
@@ -39,6 +39,14 @@
             ((ByteBuffer) message).release();
         }
     }
+    
+    public static boolean isEmpty(Object message) {
+        if (message instanceof ByteBuffer) {
+            return !((ByteBuffer) message).hasRemaining();
+        } else {
+            return false;
+        }
+    }
 
     private ByteBufferUtil() {
     }

Modified: mina/branches/1.0/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java (original)
+++ mina/branches/1.0/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java Thu Jul 26 02:12:35 2007
@@ -156,9 +156,14 @@
         }
 
         ByteBuffer inBuffer = (ByteBuffer) message;
-        ByteBuffer outBuffer = inflater.inflate(inBuffer);
-        inBuffer.release();
-        nextFilter.messageReceived(session, outBuffer);
+        if (!inBuffer.hasRemaining()) {
+            // Ignore empty buffers
+            nextFilter.messageReceived(session, inBuffer);
+        } else {
+            ByteBuffer outBuffer = inflater.inflate(inBuffer);
+            inBuffer.release();
+            nextFilter.messageReceived(session, outBuffer);
+        }
     }
 
     /*

Modified: mina/branches/1.0/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.0/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java (original)
+++ mina/branches/1.0/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java Thu Jul 26 02:12:35 2007
@@ -407,8 +407,8 @@
                         }
 
                         if (buf.hasRemaining()) {
-                            handler.scheduleMessageReceived(nextFilter,
-                                    buf);
+                            // Forward the data received after closure.
+                            handler.scheduleMessageReceived(nextFilter, buf);
                         }
                     }
                 } catch (SSLException ssle) {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java Thu Jul 26 02:12:35 2007
@@ -566,7 +566,9 @@
         public void messageReceived(NextFilter nextFilter, IoSession session,
                 Object message) throws Exception {
             try {
-                session.getHandler().messageReceived(session, message);
+                if (!ByteBufferUtil.isEmpty(message)) {
+                    session.getHandler().messageReceived(session, message);
+                }
             } finally {
                 ByteBufferUtil.releaseIfPossible(message);
             }

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Thu Jul 26 02:12:35 2007
@@ -152,8 +152,10 @@
         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
 
         try {
-            synchronized (decoderOut) {
-                decoder.decode(session, in, decoderOut);
+            if (in.hasRemaining()) {
+                synchronized (decoderOut) {
+                    decoder.decode(session, in, decoderOut);
+                }
             }
         } catch (Throwable t) {
             ProtocolDecoderException pde;

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Thu Jul 26 02:12:35 2007
@@ -31,6 +31,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.util.NamePreservingRunnable;
@@ -409,8 +410,19 @@
             }
 
             // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+            TrafficMask trafficMask = session.getTrafficMask();
+            int opsMask = trafficMask.getInterestOps();
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0 &&
+                    trafficMask.isReadable()) {
+                // This is a somewhat ugly workaround for the case that
+                // ProtocolCodecFilter is in the filter chain.
+                // Firing messageReceived() event with an empty buffer
+                // triggers ProtocolCodecFilter to flush any queued
+                // messageReceived() events on resumeRead().
+                session.getFilterChain().fireMessageReceived(
+                        session, ByteBuffer.wrap(new byte[0]));
+            }
+            key.interestOps(ops & opsMask);
         }
     }
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Thu Jul 26 02:12:35 2007
@@ -40,6 +40,7 @@
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionRecycler;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
 import org.apache.mina.common.support.BaseIoConnector;
@@ -275,8 +276,19 @@
             }
 
             // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+            TrafficMask trafficMask = session.getTrafficMask();
+            int opsMask = trafficMask.getInterestOps();
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0 &&
+                    trafficMask.isReadable()) {
+                // This is a somewhat ugly workaround for the case that
+                // ProtocolCodecFilter is in the filter chain.
+                // Firing messageReceived() event with an empty buffer
+                // triggers ProtocolCodecFilter to flush any queued
+                // messageReceived() events on resumeRead().
+                session.getFilterChain().fireMessageReceived(
+                        session, ByteBuffer.wrap(new byte[0]));
+            }
+            key.interestOps(ops & opsMask);
         }
     }
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/util/ByteBufferUtil.java Thu Jul 26 02:12:35 2007
@@ -39,6 +39,14 @@
             ((ByteBuffer) message).release();
         }
     }
+    
+    public static boolean isEmpty(Object message) {
+        if (message instanceof ByteBuffer) {
+            return !((ByteBuffer) message).hasRemaining();
+        } else {
+            return false;
+        }
+    }
 
     private ByteBufferUtil() {
     }

Modified: mina/branches/1.1/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java (original)
+++ mina/branches/1.1/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java Thu Jul 26 02:12:35 2007
@@ -156,9 +156,14 @@
         }
 
         ByteBuffer inBuffer = (ByteBuffer) message;
-        ByteBuffer outBuffer = inflater.inflate(inBuffer);
-        inBuffer.release();
-        nextFilter.messageReceived(session, outBuffer);
+        if (!inBuffer.hasRemaining()) {
+            // Ignore empty buffers
+            nextFilter.messageReceived(session, inBuffer);
+        } else {
+            ByteBuffer outBuffer = inflater.inflate(inBuffer);
+            inBuffer.release();
+            nextFilter.messageReceived(session, outBuffer);
+        }
     }
 
     /*

Modified: mina/branches/1.1/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/branches/1.1/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java (original)
+++ mina/branches/1.1/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java Thu Jul 26 02:12:35 2007
@@ -407,8 +407,8 @@
                         }
 
                         if (buf.hasRemaining()) {
-                            handler.scheduleMessageReceived(nextFilter,
-                                    buf);
+                            // Forward the data received after closure.
+                            handler.scheduleMessageReceived(nextFilter, buf);
                         }
                     }
                 } catch (SSLException ssle) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/support/AbstractIoFilterChain.java Thu Jul 26 02:12:35 2007
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoFilter;
@@ -609,7 +610,10 @@
         @Override
         public void messageReceived(NextFilter nextFilter, IoSession session,
                 Object message) throws Exception {
-            session.getHandler().messageReceived(session, message);
+            if (!(message instanceof ByteBuffer) || ((ByteBuffer) message).hasRemaining()) {
+                // Empty buffers are used as a special internal signal, so ignore it.
+                session.getHandler().messageReceived(session, message);
+            }
         }
 
         @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/ProfilerTimerFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/ProfilerTimerFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/ProfilerTimerFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/ProfilerTimerFilter.java Thu Jul 26 02:12:35 2007
@@ -25,6 +25,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoEventType;
 import org.apache.mina.common.IoFilterAdapter;
@@ -166,6 +167,12 @@
     @Override
     public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
     {
+        if (message instanceof ByteBuffer && !((ByteBuffer) message).hasRemaining()) {
+            // Ignore the special signal.
+            nextFilter.messageReceived(session, message);
+            return;
+        }
+        
         long start = timeUnit.timeNow();
         nextFilter.messageReceived( session, message );
         long end = timeUnit.timeNow();

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Thu Jul 26 02:12:35 2007
@@ -157,8 +157,10 @@
         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
 
         try {
-            synchronized (decoderOut) {
-                decoder.decode(session, in, decoderOut);
+            if (in.hasRemaining()) {
+                synchronized (decoderOut) {
+                    decoder.decode(session, in, decoderOut);
+                }
             }
         } catch (Throwable t) {
             ProtocolDecoderException pde;

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Thu Jul 26 02:12:35 2007
@@ -35,6 +35,7 @@
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.common.support.IoServiceListenerSupport;
@@ -450,8 +451,19 @@
             }
 
             // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+            TrafficMask trafficMask = session.getTrafficMask();
+            int opsMask = trafficMask.getInterestOps();
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0 &&
+                    trafficMask.isReadable()) {
+                // This is a somewhat ugly workaround for the case that
+                // ProtocolCodecFilter is in the filter chain.
+                // Firing messageReceived() event with an empty buffer
+                // triggers ProtocolCodecFilter to flush any queued
+                // messageReceived() events on resumeRead().
+                session.getFilterChain().fireMessageReceived(
+                        session, ByteBuffer.EMPTY_BUFFER);
+            }
+            key.interestOps(ops & opsMask);
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Thu Jul 26 02:12:35 2007
@@ -36,6 +36,7 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.TransportType;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
@@ -224,8 +225,19 @@
             }
 
             // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
+            TrafficMask trafficMask = session.getTrafficMask();
+            int opsMask = trafficMask.getInterestOps();
+            if ((key.interestOps() & SelectionKey.OP_READ) == 0 &&
+                    trafficMask.isReadable()) {
+                // This is a somewhat ugly workaround for the case that
+                // ProtocolCodecFilter is in the filter chain.
+                // Firing messageReceived() event with an empty buffer
+                // triggers ProtocolCodecFilter to flush any queued
+                // messageReceived() events on resumeRead().
+                session.getFilterChain().fireMessageReceived(
+                        session, ByteBuffer.wrap(new byte[0]));
+            }
+            key.interestOps(ops & opsMask);
         }
     }
 

Modified: mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java (original)
+++ mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/CompressionFilter.java Thu Jul 26 02:12:35 2007
@@ -159,8 +159,13 @@
         }
 
         ByteBuffer inBuffer = (ByteBuffer) message;
-        ByteBuffer outBuffer = inflater.inflate(inBuffer);
-        nextFilter.messageReceived(session, outBuffer);
+        if (!inBuffer.hasRemaining()) {
+            // Ignore empty buffers
+            nextFilter.messageReceived(session, inBuffer);
+        } else {
+            ByteBuffer outBuffer = inflater.inflate(inBuffer);
+            nextFilter.messageReceived(session, outBuffer);
+        }
     }
 
     /*

Modified: mina/trunk/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java?view=diff&rev=559764&r1=559763&r2=559764
==============================================================================
--- mina/trunk/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java (original)
+++ mina/trunk/filter-ssl/src/main/java/org/apache/mina/filter/SSLFilter.java Thu Jul 26 02:12:35 2007
@@ -441,8 +441,8 @@
                         }
 
                         if (buf.hasRemaining()) {
-                            handler.scheduleMessageReceived(nextFilter,
-                                    buf);
+                            // Forward the data received after closure.
+                            handler.scheduleMessageReceived(nextFilter, buf);
                         }
                     }
                 } catch (SSLException ssle) {