You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/06/02 00:40:25 UTC

[mina] branch bugfix/DIRMINA-1142 created (now 024f23d)

This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a change to branch bugfix/DIRMINA-1142
in repository https://gitbox.apache.org/repos/asf/mina.git.


      at 024f23d  initial work on parallel codec api

This branch includes the following new commits:

     new eb3a160  Adds unit test for DIRMINA-1142
     new 024f23d  initial work on parallel codec api

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[mina] 01/02: Adds unit test for DIRMINA-1142

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA-1142
in repository https://gitbox.apache.org/repos/asf/mina.git

commit eb3a160febd4db7d4f07cde7fb8727adb984ebdb
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Tue May 18 13:31:40 2021 -0400

    Adds unit test for DIRMINA-1142
---
 .../filter/codec/ParallelProtocolEncoderTest.java  | 184 +++++++++++++++++++++
 1 file changed, 184 insertions(+)

diff --git a/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java b/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java
new file mode 100644
index 0000000..905dcf1
--- /dev/null
+++ b/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java
@@ -0,0 +1,184 @@
+package org.apache.mina.filter.codec;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.future.WriteFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.junit.Test;
+
+public class ParallelProtocolEncoderTest {
+	private NioSocketConnector connector = null;
+	private NioSocketAcceptor acceptor = null;
+	private static int LOOP = 1000;
+	private static int THREAD = 3;
+
+	private static Logger logger = LogManager.getLogger(ParallelProtocolEncoderTest.class);
+	private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD);
+
+	@Test
+	public void missingMessageTest() throws Exception {
+		String host = "localhost";
+		int port = 28_000;
+
+		// server
+		acceptor = new NioSocketAcceptor();
+		acceptor.getFilterChain().addFirst("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+		ServerHandler serverHandler = new ServerHandler();
+		acceptor.setHandler(serverHandler);
+		acceptor.bind(new InetSocketAddress(host, port));
+
+		// client
+		connector = new NioSocketConnector(1);
+		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+		ClientHandler clientHandler = new ClientHandler();
+		connector.setHandler(clientHandler);
+		ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, 28_000));
+		connectFuture.awaitUninterruptibly();
+
+		final IoSession ioSession = connectFuture.getSession();
+
+		logger.info("missingMessageTest.begin with " + LOOP + " messages and " + THREAD + " threads");
+
+		for (int i = 1; i <= LOOP; i++) {
+			final String message = "Message:" + i;
+			executorService.submit(new Runnable() {
+
+				@Override
+				public void run() {
+
+					logger.info("missingMessageTest.client.write "+message);
+
+					final WriteFuture future = ioSession.write(message);
+					if (future != null) {
+						future.addListener(new IoFutureListener<WriteFuture>() {
+							@Override
+							public void operationComplete(WriteFuture writeFuture) {
+								if (!future.isWritten()) {
+									logger.error("writeFuture: " + writeFuture.getException());
+								}
+							}
+						});
+					}
+				}
+			});
+
+		}
+		logger.info("missingMessageTest.end");
+
+		int maxSleep = 5_000;
+		int time = 1000;
+		int sleep = 0;
+		while ((!clientHandler.isFinished() || !serverHandler.isFinished()) && maxSleep > sleep) {
+			sleep += time;
+			logger.info("missingMessageTest.sleep... " + sleep);
+			Thread.sleep(time);
+		}
+
+		logger.info("missingMessageTest.close");
+
+		ioSession.closeNow();
+		connector.dispose();
+		acceptor.dispose();
+
+		if (!serverHandler.isFinished()) {
+			Set<String> missingMessages = clientHandler.getMessages();
+			missingMessages.removeAll(serverHandler.getMessages());
+			logger.error("missing <" + missingMessages.size() + "> messages : " + missingMessages);
+		}
+
+		assertTrue(serverHandler.isFinished());
+		assertTrue(clientHandler.isFinished());
+	}
+
+	private static class ServerHandler extends IoHandlerAdapter {
+		private Set<String> messages = new HashSet<>(LOOP);
+		private AtomicInteger count = new AtomicInteger(0);
+
+		@Override
+		public void messageReceived(IoSession session, Object message) throws Exception {
+
+			String messageString = (String) message;
+			count.incrementAndGet();
+
+			if (messages.contains(messageString)) {
+				logger.error("messageReceived: message <" + messageString + "> already received");
+			}
+			messages.add(messageString);
+
+			// logger.info("messageReceived: <"+message+">, count="+count);
+
+			if (isFinished()) {
+				logger.info("messageReceived: finish");
+			}
+
+			super.messageReceived(session, message);
+		}
+
+		public boolean isFinished() {
+			return count.get() == LOOP;
+		}
+
+		/**
+		 * Get the messages.
+		 *
+		 * @return the messages
+		 */
+		public Set<String> getMessages() {
+			return messages;
+		}
+	}
+
+	private static class ClientHandler extends IoHandlerAdapter {
+		private Set<String> messages = new HashSet<>(LOOP);
+		private AtomicInteger count = new AtomicInteger(0);
+
+		@Override
+		public void messageSent(IoSession session, Object message) throws Exception {
+
+			logger.info("messageSent " + message);
+			
+			count.incrementAndGet();
+			String messageString = (String) message;
+			if (messages.contains(messageString)) {
+				logger.error("messageSent: message <" + messageString + "> already sent");
+			}
+			messages.add(messageString);
+
+			// logger.info("messageSent: <"+message+">, count="+count);
+
+			if (isFinished()) {
+				logger.info("messageSent: finish");
+			}
+			super.messageSent(session, message);
+		}
+
+		public boolean isFinished() {
+			return count.get() == LOOP;
+		}
+
+		/**
+		 * Get the messages.
+		 *
+		 * @return the messages
+		 */
+		public Set<String> getMessages() {
+			return messages;
+		}
+	}
+}

[mina] 02/02: initial work on parallel codec api

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA-1142
in repository https://gitbox.apache.org/repos/asf/mina.git

commit 024f23db166c83dcd359221f6baea2c0d0505db2
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Tue Jun 1 20:39:37 2021 -0400

    initial work on parallel codec api
---
 .../codec/AbstractProtocolDecoderOutput.java       |  66 +-
 .../codec/AbstractProtocolEncoderOutput.java       | 101 +--
 .../mina/filter/codec/ProtocolCodecFilter.java     | 918 +++++++++------------
 .../mina/filter/codec/ProtocolCodecSession.java    |  15 +-
 .../mina/filter/codec/ProtocolEncoderOutput.java   |  42 +-
 .../apache/mina/http/HttpServerDecoderTest.java    | 472 ++++++-----
 6 files changed, 714 insertions(+), 900 deletions(-)

diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
index 23a54c0..2997e6a 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
@@ -19,41 +19,49 @@
  */
 package org.apache.mina.filter.codec;
 
-import java.util.LinkedList;
+import java.util.ArrayDeque;
 import java.util.Queue;
 
+import org.apache.mina.core.filterchain.IoFilter.NextFilter;
+import org.apache.mina.core.session.IoSession;
+
 /**
  * A {@link ProtocolDecoderOutput} based on queue.
  *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractProtocolDecoderOutput implements ProtocolDecoderOutput {
-    /** The queue where decoded messages are stored */
-    private final Queue<Object> messageQueue = new LinkedList<>();
-
-    /**
-     * Creates a new instance of a AbstractProtocolDecoderOutput
-     */
-    public AbstractProtocolDecoderOutput() {
-        // Do nothing
-    }
-
-    /**
-     * @return The decoder's message queue
-     */
-    public Queue<Object> getMessageQueue() {
-        return messageQueue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void write(Object message) {
-        if (message == null) {
-            throw new IllegalArgumentException("message");
-        }
-
-        messageQueue.add(message);
-    }
+	/** The queue where decoded messages are stored */
+	protected final Queue<Object> messageQueue = new ArrayDeque<>();
+
+	/**
+	 * Creates a new instance of a AbstractProtocolDecoderOutput
+	 */
+	public AbstractProtocolDecoderOutput() {
+		// Do nothing
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void write(Object message) {
+		if (message == null) {
+			throw new IllegalArgumentException("message");
+		}
+
+		messageQueue.add(message);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void flush(NextFilter nextFilter, IoSession session) {
+		Object message = null;
+
+		while ((message = messageQueue.poll()) != null) {
+			nextFilter.messageReceived(session, message);
+		}
+	}
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
index e369ba9..58b8852 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
@@ -19,10 +19,8 @@
  */
 package org.apache.mina.filter.codec;
 
+import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.mina.core.buffer.IoBuffer;
 
 /**
  * A {@link ProtocolEncoderOutput} based on queue.
@@ -30,80 +28,25 @@ import org.apache.mina.core.buffer.IoBuffer;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput {
-    /** The queue where the decoded messages are stored */
-    private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
-
-    private boolean buffersOnly = true;
-
-    /**
-     * Creates an instance of AbstractProtocolEncoderOutput
-     */
-    public AbstractProtocolEncoderOutput() {
-        // Do nothing
-    }
-
-    /**
-     * @return The message queue
-     */
-    public Queue<Object> getMessageQueue() {
-        return messageQueue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void write(Object encodedMessage) {
-        if (encodedMessage instanceof IoBuffer) {
-            IoBuffer buf = (IoBuffer) encodedMessage;
-            if (buf.hasRemaining()) {
-                messageQueue.offer(buf);
-            } else {
-                throw new IllegalArgumentException("buf is empty. Forgot to call flip()?");
-            }
-        } else {
-            messageQueue.offer(encodedMessage);
-            buffersOnly = false;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void mergeAll() {
-        if (!buffersOnly) {
-            throw new IllegalStateException("the encoded message list contains a non-buffer.");
-        }
-
-        final int size = messageQueue.size();
-
-        if (size < 2) {
-            // no need to merge!
-            return;
-        }
-
-        // Get the size of merged BB
-        int sum = 0;
-        for (Object b : messageQueue) {
-            sum += ((IoBuffer) b).remaining();
-        }
-
-        // Allocate a new BB that will contain all fragments
-        IoBuffer newBuf = IoBuffer.allocate(sum);
-
-        // and merge all.
-        for (;;) {
-            IoBuffer buf = (IoBuffer) messageQueue.poll();
-            if (buf == null) {
-                break;
-            }
-
-            newBuf.put(buf);
-        }
-
-        // Push the new buffer finally.
-        newBuf.flip();
-        messageQueue.add(newBuf);
-    }
+	/** The queue where the decoded messages are stored */
+	protected final Queue<Object> messageQueue = new ArrayDeque<>();
+
+	/**
+	 * Creates an instance of AbstractProtocolEncoderOutput
+	 */
+	public AbstractProtocolEncoderOutput() {
+		// Do nothing
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void write(Object message) {
+		if (message == null) {
+			throw new IllegalArgumentException("message");
+		}
+
+		messageQueue.offer(message);
+	}
 }
\ No newline at end of file
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
index a460b3d..93039e8 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
@@ -27,13 +27,10 @@ import org.apache.mina.core.file.FileRegion;
 import org.apache.mina.core.filterchain.IoFilter;
 import org.apache.mina.core.filterchain.IoFilterAdapter;
 import org.apache.mina.core.filterchain.IoFilterChain;
-import org.apache.mina.core.future.DefaultWriteFuture;
 import org.apache.mina.core.future.WriteFuture;
-import org.apache.mina.core.session.AbstractIoSession;
 import org.apache.mina.core.session.AttributeKey;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.core.write.DefaultWriteRequest;
-import org.apache.mina.core.write.NothingWrittenException;
 import org.apache.mina.core.write.WriteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,509 +44,418 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean
  */
 public class ProtocolCodecFilter extends IoFilterAdapter {
-    /** A logger for this class */
-    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
-
-    private static final Class<?>[] EMPTY_PARAMS = new Class[0];
-
-    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
-
-    private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
-
-    private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
-
-    private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
-
-    private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
-
-    /** The factory responsible for creating the encoder and decoder */
-    private final ProtocolCodecFactory factory;
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, associating a factory
-     * for the creation of the encoder and decoder.
-     *
-     * @param factory The associated factory
-     */
-    public ProtocolCodecFilter(ProtocolCodecFactory factory) {
-        if (factory == null) {
-            throw new IllegalArgumentException("factory");
-        }
-
-        this.factory = factory;
-    }
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, without any factory.
-     * The encoder/decoder factory will be created as an inner class, using
-     * the two parameters (encoder and decoder).
-     * 
-     * @param encoder The class responsible for encoding the message
-     * @param decoder The class responsible for decoding the message
-     */
-    public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
-        if (encoder == null) {
-            throw new IllegalArgumentException("encoder");
-        }
-        if (decoder == null) {
-            throw new IllegalArgumentException("decoder");
-        }
-
-        // Create the inner Factory based on the two parameters
-        this.factory = new ProtocolCodecFactory() {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolEncoder getEncoder(IoSession session) {
-                return encoder;
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolDecoder getDecoder(IoSession session) {
-                return decoder;
-            }
-        };
-    }
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, without any factory.
-     * The encoder/decoder factory will be created as an inner class, using
-     * the two parameters (encoder and decoder), which are class names. Instances
-     * for those classes will be created in this constructor.
-     * 
-     * @param encoderClass The class responsible for encoding the message
-     * @param decoderClass The class responsible for decoding the message
-     */
-    public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
-            final Class<? extends ProtocolDecoder> decoderClass) {
-        if (encoderClass == null) {
-            throw new IllegalArgumentException("encoderClass");
-        }
-        if (decoderClass == null) {
-            throw new IllegalArgumentException("decoderClass");
-        }
-        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
-            throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
-        }
-        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
-            throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
-        }
-        try {
-            encoderClass.getConstructor(EMPTY_PARAMS);
-        } catch (NoSuchMethodException e) {
-            throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
-        }
-        try {
-            decoderClass.getConstructor(EMPTY_PARAMS);
-        } catch (NoSuchMethodException e) {
-            throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
-        }
-
-        final ProtocolEncoder encoder;
-
-        try {
-            encoder = encoderClass.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("encoderClass cannot be initialized");
-        }
-
-        final ProtocolDecoder decoder;
-
-        try {
-            decoder = decoderClass.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("decoderClass cannot be initialized");
-        }
-
-        // Create the inner factory based on the two parameters.
-        this.factory = new ProtocolCodecFactory() {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolEncoder getEncoder(IoSession session) throws Exception {
-                return encoder;
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolDecoder getDecoder(IoSession session) throws Exception {
-                return decoder;
-            }
-        };
-    }
-
-    /**
-     * Get the encoder instance from a given session.
-     *
-     * @param session The associated session we will get the encoder from
-     * @return The encoder instance, if any
-     */
-    public ProtocolEncoder getEncoder(IoSession session) {
-        return (ProtocolEncoder) session.getAttribute(ENCODER);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
-        if (parent.contains(this)) {
-            throw new IllegalArgumentException(
-                    "You can't add the same filter instance more than once.  Create another instance and add it.");
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
-        // Clean everything
-        disposeCodec(parent.getSession());
-    }
-
-    /**
-     * Process the incoming message, calling the session decoder. As the incoming
-     * buffer might contains more than one messages, we have to loop until the decoder
-     * throws an exception.
-     * 
-     *  while ( buffer not empty )
-     *    try
-     *      decode ( buffer )
-     *    catch
-     *      break;
-     * 
-     */
-    @Override
-    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
-        }
-
-        if (!(message instanceof IoBuffer)) {
-            nextFilter.messageReceived(session, message);
-            return;
-        }
-
-        IoBuffer in = (IoBuffer) message;
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
-        // Loop until we don't have anymore byte in the buffer,
-        // or until the decoder throws an unrecoverable exception or
-        // can't decoder a message, because there are not enough
-        // data in the buffer
-        while (in.hasRemaining()) {
-            int oldPos = in.position();
-            try {
-                synchronized (session) {
-                    // Call the decoder with the read bytes
-                    decoder.decode(session, in, decoderOut);
-                }
-                // Finish decoding if no exception was thrown.
-                decoderOut.flush(nextFilter, session);
-            } catch (Exception e) {
-                ProtocolDecoderException pde;
-                if (e instanceof ProtocolDecoderException) {
-                    pde = (ProtocolDecoderException) e;
-                } else {
-                    pde = new ProtocolDecoderException(e);
-                }
-                if (pde.getHexdump() == null) {
-                    // Generate a message hex dump
-                    int curPos = in.position();
-                    in.position(oldPos);
-                    pde.setHexdump(in.getHexDump());
-                    in.position(curPos);
-                }
-                // Fire the exceptionCaught event.
-                decoderOut.flush(nextFilter, session);
-                nextFilter.exceptionCaught(session, pde);
-                // Retry only if the type of the caught exception is
-                // recoverable and the buffer position has changed.
-                // We check buffer position additionally to prevent an
-                // infinite loop.
-                if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
-                    break;
-                }
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-        if (writeRequest instanceof EncodedWriteRequest) {
-            return;
-        }
-
-        nextFilter.messageSent(session, writeRequest);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-        Object message = writeRequest.getMessage();
-
-        // Bypass the encoding if the message is contained in a IoBuffer,
-        // as it has already been encoded before
-        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
-            nextFilter.filterWrite(session, writeRequest);
-            return;
-        }
-
-        // Get the encoder in the session
-        ProtocolEncoder encoder = factory.getEncoder(session);
-
-        ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
-
-        if (encoder == null) {
-            throw new ProtocolEncoderException("The encoder is null for the session " + session);
-        }
-
-        try {
-            // Now we can try to encode the response
-            encoder.encode(session, message, encoderOut);
-
-            // Send it directly
-            Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
-
-            // Write all the encoded messages now
-            while (!bufferQueue.isEmpty()) {
-                Object encodedMessage = bufferQueue.poll();
-
-                if (encodedMessage == null) {
-                    break;
-                }
-
-		// Flush only when the buffer has remaining.
-		if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
-		    if (bufferQueue.isEmpty()) {
-			writeRequest.setMessage(encodedMessage);
+	/** A logger for this class */
+	private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
+
+	private static final Class<?>[] EMPTY_PARAMS = new Class[0];
+
+	private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
+
+	private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
+
+	private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
+
+	private static final ProtocolDecoderOutputLocal DECODER_OUTPUT = new ProtocolDecoderOutputLocal();
+
+	private static final ProtocolEncoderOutputLocal ENCODER_OUTPUT = new ProtocolEncoderOutputLocal();
+
+	/** The factory responsible for creating the encoder and decoder */
+	private final ProtocolCodecFactory factory;
+
+	/**
+	 * Creates a new instance of ProtocolCodecFilter, associating a factory for the
+	 * creation of the encoder and decoder.
+	 *
+	 * @param factory The associated factory
+	 */
+	public ProtocolCodecFilter(ProtocolCodecFactory factory) {
+		if (factory == null) {
+			throw new IllegalArgumentException("factory");
+		}
+
+		this.factory = factory;
+	}
+
+	/**
+	 * Creates a new instance of ProtocolCodecFilter, without any factory. The
+	 * encoder/decoder factory will be created as an inner class, using the two
+	 * parameters (encoder and decoder).
+	 * 
+	 * @param encoder The class responsible for encoding the message
+	 * @param decoder The class responsible for decoding the message
+	 */
+	public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
+		if (encoder == null) {
+			throw new IllegalArgumentException("encoder");
+		}
+		if (decoder == null) {
+			throw new IllegalArgumentException("decoder");
+		}
+
+		// Create the inner Factory based on the two parameters
+		this.factory = new ProtocolCodecFactory() {
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolEncoder getEncoder(IoSession session) {
+				return encoder;
+			}
+
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolDecoder getDecoder(IoSession session) {
+				return decoder;
+			}
+		};
+	}
+
+	/**
+	 * Creates a new instance of ProtocolCodecFilter, without any factory. The
+	 * encoder/decoder factory will be created as an inner class, using the two
+	 * parameters (encoder and decoder), which are class names. Instances for those
+	 * classes will be created in this constructor.
+	 * 
+	 * @param encoderClass The class responsible for encoding the message
+	 * @param decoderClass The class responsible for decoding the message
+	 */
+	public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
+			final Class<? extends ProtocolDecoder> decoderClass) {
+		if (encoderClass == null) {
+			throw new IllegalArgumentException("encoderClass");
+		}
+		if (decoderClass == null) {
+			throw new IllegalArgumentException("decoderClass");
+		}
+		if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
+			throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
+		}
+		if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
+			throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
+		}
+		try {
+			encoderClass.getConstructor(EMPTY_PARAMS);
+		} catch (NoSuchMethodException e) {
+			throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
+		}
+		try {
+			decoderClass.getConstructor(EMPTY_PARAMS);
+		} catch (NoSuchMethodException e) {
+			throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
+		}
+
+		final ProtocolEncoder encoder;
+
+		try {
+			encoder = encoderClass.newInstance();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("encoderClass cannot be initialized");
+		}
+
+		final ProtocolDecoder decoder;
+
+		try {
+			decoder = decoderClass.newInstance();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("decoderClass cannot be initialized");
+		}
+
+		// Create the inner factory based on the two parameters.
+		this.factory = new ProtocolCodecFactory() {
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolEncoder getEncoder(IoSession session) throws Exception {
+				return encoder;
+			}
+
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolDecoder getDecoder(IoSession session) throws Exception {
+				return decoder;
+			}
+		};
+	}
+
+	/**
+	 * Get the encoder instance from a given session.
+	 *
+	 * @param session The associated session we will get the encoder from
+	 * @return The encoder instance, if any
+	 */
+	public ProtocolEncoder getEncoder(IoSession session) {
+		return (ProtocolEncoder) session.getAttribute(ENCODER);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+		if (parent.contains(this)) {
+			throw new IllegalArgumentException(
+					"You can't add the same filter instance more than once.  Create another instance and add it.");
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+		// Clean everything
+		disposeCodec(parent.getSession());
+	}
+
+	/**
+	 * Process the incoming message, calling the session decoder. As the incoming
+	 * buffer might contains more than one messages, we have to loop until the
+	 * decoder throws an exception.
+	 * 
+	 * while ( buffer not empty ) try decode ( buffer ) catch break;
+	 * 
+	 */
+	@Override
+	public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message)
+			throws Exception {
+		if (LOGGER.isDebugEnabled()) {
+			LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
+		}
+
+		if (!(message instanceof IoBuffer)) {
+			nextFilter.messageReceived(session, message);
+			return;
+		}
+
+		final IoBuffer in = (IoBuffer) message;
+		final ProtocolDecoder decoder = factory.getDecoder(session);
+		final ProtocolDecoderOutputImpl decoderOut = DECODER_OUTPUT.get();
+
+		// Loop until we don't have anymore byte in the buffer,
+		// or until the decoder throws an unrecoverable exception or
+		// can't decoder a message, because there are not enough
+		// data in the buffer
+		while (in.hasRemaining()) {
+			int oldPos = in.position();
+			try {
+				// Call the decoder with the read bytes
+				decoder.decode(session, in, decoderOut);
+				// Finish decoding if no exception was thrown.
+				decoderOut.flush(nextFilter, session);
+			} catch (Exception e) {
+				ProtocolDecoderException pde;
+				if (e instanceof ProtocolDecoderException) {
+					pde = (ProtocolDecoderException) e;
+				} else {
+					pde = new ProtocolDecoderException(e);
+				}
+				if (pde.getHexdump() == null) {
+					// Generate a message hex dump
+					int curPos = in.position();
+					in.position(oldPos);
+					pde.setHexdump(in.getHexDump());
+					in.position(curPos);
+				}
+				// Fire the exceptionCaught event.
+				decoderOut.flush(nextFilter, session);
+				nextFilter.exceptionCaught(session, pde);
+				// Retry only if the type of the caught exception is
+				// recoverable and the buffer position has changed.
+				// We check buffer position additionally to prevent an
+				// infinite loop.
+				if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
+					break;
+				}
+			}
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+		if (writeRequest instanceof EncodedWriteRequest) {
+			return;
+		}
+
+		nextFilter.messageSent(session, writeRequest);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
+			throws Exception {
+		final Object message = writeRequest.getMessage();
+
+		// Bypass the encoding if the message is contained in a IoBuffer,
+		// as it has already been encoded before
+		if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
 			nextFilter.filterWrite(session, writeRequest);
-		    } else {
-			SocketAddress destination = writeRequest.getDestination();
-			WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
-			nextFilter.filterWrite(session, encodedWriteRequest);
-		    }
+			return;
+		}
+
+		// Get the encoder in the session
+		final ProtocolEncoder encoder = factory.getEncoder(session);
+		final ProtocolEncoderOutputImpl encoderOut = ENCODER_OUTPUT.get();
+
+		if (encoder == null) {
+			throw new ProtocolEncoderException("The encoder is null for the session " + session);
+		}
+
+		try {
+			// Now we can try to encode the response
+			encoder.encode(session, message, encoderOut);
+
+			final Queue<Object> queue = encoderOut.messageQueue;
+
+			if (queue.isEmpty()) {
+				// Write empty message to ensure that messageSent is fired later
+				writeRequest.setMessage(EMPTY_BUFFER);
+				nextFilter.filterWrite(session, writeRequest);
+			} else {
+				// Write all the encoded messages now
+				Object encodedMessage = null;
+
+				while ((encodedMessage = queue.poll()) != null) {
+					if (queue.isEmpty()) {
+						// Write last message using original WriteRequest to ensure that any Future and
+						// dependency on messageSent event is emitted correctly
+						writeRequest.setMessage(encodedMessage);
+						nextFilter.filterWrite(session, writeRequest);
+					} else {
+						SocketAddress destination = writeRequest.getDestination();
+						WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
+						nextFilter.filterWrite(session, encodedWriteRequest);
+					}
+				}
+			}
+		} catch (final ProtocolEncoderException e) {
+			throw e;
+		} catch (final Exception e) {
+			// Generate the correct exception
+			throw new ProtocolEncoderException(e);
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+		// Call finishDecode() first when a connection is closed.
+		ProtocolDecoder decoder = factory.getDecoder(session);
+		ProtocolDecoderOutput decoderOut = DECODER_OUTPUT.get();
+
+		try {
+			decoder.finishDecode(session, decoderOut);
+		} catch (Exception e) {
+			ProtocolDecoderException pde;
+			if (e instanceof ProtocolDecoderException) {
+				pde = (ProtocolDecoderException) e;
+			} else {
+				pde = new ProtocolDecoderException(e);
+			}
+			throw pde;
+		} finally {
+			// Dispose everything
+			disposeCodec(session);
+			decoderOut.flush(nextFilter, session);
+		}
+
+		// Call the next filter
+		nextFilter.sessionClosed(session);
+	}
+
+	private static class EncodedWriteRequest extends DefaultWriteRequest {
+		public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
+			super(encodedMessage, future, destination);
+		}
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public boolean isEncoded() {
+			return true;
+		}
+	}
+
+	private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
+		public ProtocolDecoderOutputImpl() {
+			// Do nothing
+		}
+	}
+
+	private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
+		public ProtocolEncoderOutputImpl() {
+			// Do nothing
+		}
+	}
+
+	// ----------- Helper methods ---------------------------------------------
+	/**
+	 * Dispose the encoder, decoder, and the callback for the decoded messages.
+	 */
+	private void disposeCodec(IoSession session) {
+		// We just remove the two instances of encoder/decoder to release resources
+		// from the session
+		disposeEncoder(session);
+		disposeDecoder(session);
+	}
+
+	/**
+	 * Dispose the encoder, removing its instance from the session's attributes, and
+	 * calling the associated dispose method.
+	 */
+	private void disposeEncoder(IoSession session) {
+		ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
+		if (encoder == null) {
+			return;
+		}
+
+		try {
+			encoder.dispose(session);
+		} catch (Exception e) {
+			LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
+		}
+	}
+
+	/**
+	 * Dispose the decoder, removing its instance from the session's attributes, and
+	 * calling the associated dispose method.
+	 */
+	private void disposeDecoder(IoSession session) {
+		ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
+		if (decoder == null) {
+			return;
+		}
+
+		try {
+			decoder.dispose(session);
+		} catch (Exception e) {
+			LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
+		}
+	}
+
+	static private class ProtocolDecoderOutputLocal extends ThreadLocal<ProtocolDecoderOutputImpl> {
+		@Override
+		protected ProtocolDecoderOutputImpl initialValue() {
+			return new ProtocolDecoderOutputImpl();
+		}
+	}
+
+	static private class ProtocolEncoderOutputLocal extends ThreadLocal<ProtocolEncoderOutputImpl> {
+		@Override
+		protected ProtocolEncoderOutputImpl initialValue() {
+			return new ProtocolEncoderOutputImpl();
 		}
-	    }
-        } catch (Exception e) {
-            ProtocolEncoderException pee;
-
-            // Generate the correct exception
-            if (e instanceof ProtocolEncoderException) {
-                pee = (ProtocolEncoderException) e;
-            } else {
-                pee = new ProtocolEncoderException(e);
-            }
-
-            throw pee;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
-        // Call finishDecode() first when a connection is closed.
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
-        try {
-            decoder.finishDecode(session, decoderOut);
-        } catch (Exception e) {
-            ProtocolDecoderException pde;
-            if (e instanceof ProtocolDecoderException) {
-                pde = (ProtocolDecoderException) e;
-            } else {
-                pde = new ProtocolDecoderException(e);
-            }
-            throw pde;
-        } finally {
-            // Dispose everything
-            disposeCodec(session);
-            decoderOut.flush(nextFilter, session);
-        }
-
-        // Call the next filter
-        nextFilter.sessionClosed(session);
-    }
-
-    private static class EncodedWriteRequest extends DefaultWriteRequest {
-        public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
-            super(encodedMessage, future, destination);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public boolean isEncoded() {
-            return true;
-        }
-    }
-
-    private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
-        public ProtocolDecoderOutputImpl() {
-            // Do nothing
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void flush(NextFilter nextFilter, IoSession session) {
-            Queue<Object> messageQueue = getMessageQueue();
-
-            while (!messageQueue.isEmpty()) {
-                nextFilter.messageReceived(session, messageQueue.poll());
-            }
-        }
-    }
-
-    private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
-        private final IoSession session;
-
-        private final NextFilter nextFilter;
-
-        /** The WriteRequest destination */
-        private final SocketAddress destination;
-
-        public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
-            this.session = session;
-            this.nextFilter = nextFilter;
-
-            // Only store the destination, not the full WriteRequest.
-            destination = writeRequest.getDestination();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public WriteFuture flush() {
-            Queue<Object> bufferQueue = getMessageQueue();
-            WriteFuture future = null;
-
-            while (!bufferQueue.isEmpty()) {
-                Object encodedMessage = bufferQueue.poll();
-
-                if (encodedMessage == null) {
-                    break;
-                }
-
-                // Flush only when the buffer has remaining.
-                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
-                    future = new DefaultWriteFuture(session);
-                    nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
-                }
-            }
-
-            if (future == null) {
-                // Creates an empty writeRequest containing the destination
-                future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
-            }
-
-            return future;
-        }
-    }
-
-    //----------- Helper methods ---------------------------------------------
-    /**
-     * Dispose the encoder, decoder, and the callback for the decoded
-     * messages.
-     */
-    private void disposeCodec(IoSession session) {
-        // We just remove the two instances of encoder/decoder to release resources
-        // from the session
-        disposeEncoder(session);
-        disposeDecoder(session);
-
-        // We also remove the callback
-        disposeDecoderOut(session);
-    }
-
-    /**
-     * Dispose the encoder, removing its instance from the
-     * session's attributes, and calling the associated
-     * dispose method.
-     */
-    private void disposeEncoder(IoSession session) {
-        ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
-        if (encoder == null) {
-            return;
-        }
-
-        try {
-            encoder.dispose(session);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
-        }
-    }
-
-    /**
-     * Dispose the decoder, removing its instance from the
-     * session's attributes, and calling the associated
-     * dispose method.
-     */
-    private void disposeDecoder(IoSession session) {
-        ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
-        if (decoder == null) {
-            return;
-        }
-
-        try {
-            decoder.dispose(session);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
-        }
-    }
-
-    /**
-     * Return a reference to the decoder callback. If it's not already created
-     * and stored into the session, we create a new instance.
-     */
-    private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
-        ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
-
-        if (out == null) {
-            // Create a new instance, and stores it into the session
-            out = new ProtocolDecoderOutputImpl();
-            session.setAttribute(DECODER_OUT, out);
-        }
-
-        return out;
-    }
-
-    private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
-        ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
-
-        if (out == null) {
-            // Create a new instance, and stores it into the session
-            out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
-            session.setAttribute(ENCODER_OUT, out);
-        }
-
-        return out;
-    }
-
-    /**
-     * Remove the decoder callback from the session's attributes.
-     */
-    private void disposeDecoderOut(IoSession session) {
-        session.removeAttribute(DECODER_OUT);
-    }
+	}
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
index 2b5f89c..1638491 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
@@ -59,17 +59,8 @@ import org.apache.mina.core.session.IoSession;
  */
 public class ProtocolCodecSession extends DummySession {
 
-    private final WriteFuture notWrittenFuture = DefaultWriteFuture.newNotWrittenFuture(this,
-            new UnsupportedOperationException());
-
     private final AbstractProtocolEncoderOutput encoderOutput = new AbstractProtocolEncoderOutput() {
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public WriteFuture flush() {
-            return notWrittenFuture;
-        }
+
     };
 
     private final AbstractProtocolDecoderOutput decoderOutput = new AbstractProtocolDecoderOutput() {
@@ -101,7 +92,7 @@ public class ProtocolCodecSession extends DummySession {
      * @return the {@link Queue} of the buffered encoder output.
      */
     public Queue<Object> getEncoderOutputQueue() {
-        return encoderOutput.getMessageQueue();
+        return encoderOutput.messageQueue;
     }
 
     /**
@@ -116,6 +107,6 @@ public class ProtocolCodecSession extends DummySession {
      * @return the {@link Queue} of the buffered decoder output.
      */
     public Queue<Object> getDecoderOutputQueue() {
-        return decoderOutput.getMessageQueue();
+        return decoderOutput.messageQueue;
     }
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
index 0fc847c..508ee23 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
@@ -21,44 +21,22 @@ package org.apache.mina.filter.codec;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.file.FileRegion;
-import org.apache.mina.core.future.WriteFuture;
 
 /**
  * Callback for {@link ProtocolEncoder} to generate encoded messages such as
- * {@link IoBuffer}s.  {@link ProtocolEncoder} must call {@link #write(Object)}
+ * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)}
  * for each encoded message.
  *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public interface ProtocolEncoderOutput {
-    /**
-     * Callback for {@link ProtocolEncoder} to generate an encoded message such
-     * as an {@link IoBuffer}. {@link ProtocolEncoder} must call
-     * {@link #write(Object)} for each encoded message.
-     *
-     * @param encodedMessage the encoded message, typically an {@link IoBuffer}
-     *                       or a {@link FileRegion}.
-     */
-    void write(Object encodedMessage);
-
-    /**
-     * Merges all buffers you wrote via {@link #write(Object)} into
-     * one {@link IoBuffer} and replaces the old fragmented ones with it.
-     * This method is useful when you want to control the way MINA generates
-     * network packets.  Please note that this method only works when you
-     * called {@link #write(Object)} method with only {@link IoBuffer}s.
-     * 
-     * @throws IllegalStateException if you wrote something else than {@link IoBuffer}
-     */
-    void mergeAll();
-
-    /**
-     * Flushes all buffers you wrote via {@link #write(Object)} to
-     * the session.  This operation is asynchronous; please wait for
-     * the returned {@link WriteFuture} if you want to wait for
-     * the buffers flushed.
-     *
-     * @return <tt>null</tt> if there is nothing to flush at all.
-     */
-    WriteFuture flush();
+	/**
+	 * Callback for {@link ProtocolEncoder} to generate an encoded message such as
+	 * an {@link IoBuffer}. {@link ProtocolEncoder} must call {@link #write(Object)}
+	 * for each encoded message.
+	 *
+	 * @param message the encoded message, typically an {@link IoBuffer} or a
+	 *                {@link FileRegion}.
+	 */
+	void write(Object message);
 }
\ No newline at end of file
diff --git a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
index 87b886d..f8497b8 100644
--- a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
+++ b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
@@ -25,9 +25,9 @@ import static org.junit.Assert.assertTrue;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
+import java.util.Queue;
 
 import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.filterchain.IoFilter.NextFilter;
 import org.apache.mina.core.session.DummySession;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput;
@@ -37,262 +37,250 @@ import org.apache.mina.http.api.HttpRequest;
 import org.junit.Test;
 
 public class HttpServerDecoderTest {
-    private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
+	private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
 
-    private static final ProtocolDecoder decoder = new HttpServerDecoder();
+	private static final ProtocolDecoder decoder = new HttpServerDecoder();
 
-    /*
-     * Use a single session for all requests in order to test state management better
-     */
-    private static IoSession session = new DummySession();
+	/*
+	 * Use a single session for all requests in order to test state management
+	 * better
+	 */
+	private static IoSession session = new DummySession();
 
-    /**
-     * Build an IO buffer containing a simple minimal HTTP request.
-     * 
-     * @param method the HTTP method
-     * @param body the option body
-     * @return the built IO buffer
-     * @throws CharacterCodingException if encoding fails
-     */
-    protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException {
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
-        
-        if (body != null) {
-            buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder);
-            buffer.putString(body, encoder);
-        } else {
-            buffer.putString("\r\n", encoder);
-        }
-        
-        buffer.rewind();
-        
-        return buffer;
-    }
+	/**
+	 * Build an IO buffer containing a simple minimal HTTP request.
+	 * 
+	 * @param method the HTTP method
+	 * @param body   the option body
+	 * @return the built IO buffer
+	 * @throws CharacterCodingException if encoding fails
+	 */
+	protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException {
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
 
-    protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException {
-        return getRequestBuffer(method, null);
-    }
+		if (body != null) {
+			buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder);
+			buffer.putString(body, encoder);
+		} else {
+			buffer.putString("\r\n", encoder);
+		}
 
-    /**
-     * Execute an HTPP request and return the queue of messages.
-     * 
-     * @param method the HTTP method
-     * @param body the optional body
-     * @return the protocol output and its queue of messages
-     * @throws Exception if error occurs (encoding,...)
-     */
-    protected static AbstractProtocolDecoderOutput executeRequest(String method, String body) throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
+		buffer.rewind();
 
-        IoBuffer buffer = getRequestBuffer(method, body); //$NON-NLS-1$
-        
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        
-        return out;
-    }
+		return buffer;
+	}
 
-    @Test
-    public void testGetRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("GET", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException {
+		return getRequestBuffer(method, null);
+	}
 
-    @Test
-    public void testGetRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("GET", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	protected static class ProtocolDecoderQueue extends AbstractProtocolDecoderOutput {
+		public Queue<Object> getQueue() {
+			return this.messageQueue;
+		}
+	}
 
-    @Test
-    public void testPutRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("PUT", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	/**
+	 * Execute an HTPP request and return the queue of messages.
+	 * 
+	 * @param method the HTTP method
+	 * @param body   the optional body
+	 * @return the protocol output and its queue of messages
+	 * @throws Exception if error occurs (encoding,...)
+	 */
+	protected static ProtocolDecoderQueue executeRequest(String method, String body) throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
 
-    @Test
-    public void testPutRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("PUT", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+		IoBuffer buffer = getRequestBuffer(method, body); // $NON-NLS-1$
 
-    @Test
-    public void testPostRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("POST", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
 
-    @Test
-    public void testPostRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("POST", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+		return out;
+	}
 
-    @Test
-    public void testDeleteRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("DELETE", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testGetRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("GET", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void testDeleteRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("DELETE", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    
-    @Test
-    public void testDIRMINA965NoContent() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testGetRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("GET", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void testDIRMINA965WithContent() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    @Test
-    public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("B", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(4, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    
-    @Test
-    public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testPutRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("PUT", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:  localhost\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testPutRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("PUT", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:localhost  \r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testPostRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("POST", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testPostRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("POST", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDeleteRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("DELETE", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDeleteRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("DELETE", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDIRMINA965NoContent() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("dummy\r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDIRMINA965WithContent() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("B", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(4, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		HttpRequest request = (HttpRequest) out.getQueue().poll();
+		assertEquals("localhost", request.getHeader("host"));
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.0\r\nHost:  localhost\r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		HttpRequest request = (HttpRequest) out.getQueue().poll();
+		assertEquals("localhost", request.getHeader("host"));
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.0\r\nHost:localhost  \r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		HttpRequest request = (HttpRequest) out.getQueue().poll();
+		assertEquals("localhost", request.getHeader("host"));
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 }