You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jo...@apache.org on 2021/06/02 00:40:27 UTC

[mina] 02/02: initial work on parallel codec api

This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA-1142
in repository https://gitbox.apache.org/repos/asf/mina.git

commit 024f23db166c83dcd359221f6baea2c0d0505db2
Author: Jonathan Valliere <jo...@apache.org>
AuthorDate: Tue Jun 1 20:39:37 2021 -0400

    initial work on parallel codec api
---
 .../codec/AbstractProtocolDecoderOutput.java       |  66 +-
 .../codec/AbstractProtocolEncoderOutput.java       | 101 +--
 .../mina/filter/codec/ProtocolCodecFilter.java     | 918 +++++++++------------
 .../mina/filter/codec/ProtocolCodecSession.java    |  15 +-
 .../mina/filter/codec/ProtocolEncoderOutput.java   |  42 +-
 .../apache/mina/http/HttpServerDecoderTest.java    | 472 ++++++-----
 6 files changed, 714 insertions(+), 900 deletions(-)

diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
index 23a54c0..2997e6a 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
@@ -19,41 +19,49 @@
  */
 package org.apache.mina.filter.codec;
 
-import java.util.LinkedList;
+import java.util.ArrayDeque;
 import java.util.Queue;
 
+import org.apache.mina.core.filterchain.IoFilter.NextFilter;
+import org.apache.mina.core.session.IoSession;
+
 /**
  * A {@link ProtocolDecoderOutput} based on queue.
  *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractProtocolDecoderOutput implements ProtocolDecoderOutput {
-    /** The queue where decoded messages are stored */
-    private final Queue<Object> messageQueue = new LinkedList<>();
-
-    /**
-     * Creates a new instance of a AbstractProtocolDecoderOutput
-     */
-    public AbstractProtocolDecoderOutput() {
-        // Do nothing
-    }
-
-    /**
-     * @return The decoder's message queue
-     */
-    public Queue<Object> getMessageQueue() {
-        return messageQueue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void write(Object message) {
-        if (message == null) {
-            throw new IllegalArgumentException("message");
-        }
-
-        messageQueue.add(message);
-    }
+	/** The queue where decoded messages are stored */
+	protected final Queue<Object> messageQueue = new ArrayDeque<>();
+
+	/**
+	 * Creates a new instance of a AbstractProtocolDecoderOutput
+	 */
+	public AbstractProtocolDecoderOutput() {
+		// Do nothing
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void write(Object message) {
+		if (message == null) {
+			throw new IllegalArgumentException("message");
+		}
+
+		messageQueue.add(message);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void flush(NextFilter nextFilter, IoSession session) {
+		Object message = null;
+
+		while ((message = messageQueue.poll()) != null) {
+			nextFilter.messageReceived(session, message);
+		}
+	}
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
index e369ba9..58b8852 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
@@ -19,10 +19,8 @@
  */
 package org.apache.mina.filter.codec;
 
+import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.mina.core.buffer.IoBuffer;
 
 /**
  * A {@link ProtocolEncoderOutput} based on queue.
@@ -30,80 +28,25 @@ import org.apache.mina.core.buffer.IoBuffer;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput {
-    /** The queue where the decoded messages are stored */
-    private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
-
-    private boolean buffersOnly = true;
-
-    /**
-     * Creates an instance of AbstractProtocolEncoderOutput
-     */
-    public AbstractProtocolEncoderOutput() {
-        // Do nothing
-    }
-
-    /**
-     * @return The message queue
-     */
-    public Queue<Object> getMessageQueue() {
-        return messageQueue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void write(Object encodedMessage) {
-        if (encodedMessage instanceof IoBuffer) {
-            IoBuffer buf = (IoBuffer) encodedMessage;
-            if (buf.hasRemaining()) {
-                messageQueue.offer(buf);
-            } else {
-                throw new IllegalArgumentException("buf is empty. Forgot to call flip()?");
-            }
-        } else {
-            messageQueue.offer(encodedMessage);
-            buffersOnly = false;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void mergeAll() {
-        if (!buffersOnly) {
-            throw new IllegalStateException("the encoded message list contains a non-buffer.");
-        }
-
-        final int size = messageQueue.size();
-
-        if (size < 2) {
-            // no need to merge!
-            return;
-        }
-
-        // Get the size of merged BB
-        int sum = 0;
-        for (Object b : messageQueue) {
-            sum += ((IoBuffer) b).remaining();
-        }
-
-        // Allocate a new BB that will contain all fragments
-        IoBuffer newBuf = IoBuffer.allocate(sum);
-
-        // and merge all.
-        for (;;) {
-            IoBuffer buf = (IoBuffer) messageQueue.poll();
-            if (buf == null) {
-                break;
-            }
-
-            newBuf.put(buf);
-        }
-
-        // Push the new buffer finally.
-        newBuf.flip();
-        messageQueue.add(newBuf);
-    }
+	/** The queue where the decoded messages are stored */
+	protected final Queue<Object> messageQueue = new ArrayDeque<>();
+
+	/**
+	 * Creates an instance of AbstractProtocolEncoderOutput
+	 */
+	public AbstractProtocolEncoderOutput() {
+		// Do nothing
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void write(Object message) {
+		if (message == null) {
+			throw new IllegalArgumentException("message");
+		}
+
+		messageQueue.offer(message);
+	}
 }
\ No newline at end of file
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
index a460b3d..93039e8 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
@@ -27,13 +27,10 @@ import org.apache.mina.core.file.FileRegion;
 import org.apache.mina.core.filterchain.IoFilter;
 import org.apache.mina.core.filterchain.IoFilterAdapter;
 import org.apache.mina.core.filterchain.IoFilterChain;
-import org.apache.mina.core.future.DefaultWriteFuture;
 import org.apache.mina.core.future.WriteFuture;
-import org.apache.mina.core.session.AbstractIoSession;
 import org.apache.mina.core.session.AttributeKey;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.core.write.DefaultWriteRequest;
-import org.apache.mina.core.write.NothingWrittenException;
 import org.apache.mina.core.write.WriteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,509 +44,418 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean
  */
 public class ProtocolCodecFilter extends IoFilterAdapter {
-    /** A logger for this class */
-    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
-
-    private static final Class<?>[] EMPTY_PARAMS = new Class[0];
-
-    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
-
-    private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
-
-    private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
-
-    private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
-
-    private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
-
-    /** The factory responsible for creating the encoder and decoder */
-    private final ProtocolCodecFactory factory;
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, associating a factory
-     * for the creation of the encoder and decoder.
-     *
-     * @param factory The associated factory
-     */
-    public ProtocolCodecFilter(ProtocolCodecFactory factory) {
-        if (factory == null) {
-            throw new IllegalArgumentException("factory");
-        }
-
-        this.factory = factory;
-    }
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, without any factory.
-     * The encoder/decoder factory will be created as an inner class, using
-     * the two parameters (encoder and decoder).
-     * 
-     * @param encoder The class responsible for encoding the message
-     * @param decoder The class responsible for decoding the message
-     */
-    public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
-        if (encoder == null) {
-            throw new IllegalArgumentException("encoder");
-        }
-        if (decoder == null) {
-            throw new IllegalArgumentException("decoder");
-        }
-
-        // Create the inner Factory based on the two parameters
-        this.factory = new ProtocolCodecFactory() {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolEncoder getEncoder(IoSession session) {
-                return encoder;
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolDecoder getDecoder(IoSession session) {
-                return decoder;
-            }
-        };
-    }
-
-    /**
-     * Creates a new instance of ProtocolCodecFilter, without any factory.
-     * The encoder/decoder factory will be created as an inner class, using
-     * the two parameters (encoder and decoder), which are class names. Instances
-     * for those classes will be created in this constructor.
-     * 
-     * @param encoderClass The class responsible for encoding the message
-     * @param decoderClass The class responsible for decoding the message
-     */
-    public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
-            final Class<? extends ProtocolDecoder> decoderClass) {
-        if (encoderClass == null) {
-            throw new IllegalArgumentException("encoderClass");
-        }
-        if (decoderClass == null) {
-            throw new IllegalArgumentException("decoderClass");
-        }
-        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
-            throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
-        }
-        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
-            throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
-        }
-        try {
-            encoderClass.getConstructor(EMPTY_PARAMS);
-        } catch (NoSuchMethodException e) {
-            throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
-        }
-        try {
-            decoderClass.getConstructor(EMPTY_PARAMS);
-        } catch (NoSuchMethodException e) {
-            throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
-        }
-
-        final ProtocolEncoder encoder;
-
-        try {
-            encoder = encoderClass.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("encoderClass cannot be initialized");
-        }
-
-        final ProtocolDecoder decoder;
-
-        try {
-            decoder = decoderClass.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("decoderClass cannot be initialized");
-        }
-
-        // Create the inner factory based on the two parameters.
-        this.factory = new ProtocolCodecFactory() {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolEncoder getEncoder(IoSession session) throws Exception {
-                return encoder;
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ProtocolDecoder getDecoder(IoSession session) throws Exception {
-                return decoder;
-            }
-        };
-    }
-
-    /**
-     * Get the encoder instance from a given session.
-     *
-     * @param session The associated session we will get the encoder from
-     * @return The encoder instance, if any
-     */
-    public ProtocolEncoder getEncoder(IoSession session) {
-        return (ProtocolEncoder) session.getAttribute(ENCODER);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
-        if (parent.contains(this)) {
-            throw new IllegalArgumentException(
-                    "You can't add the same filter instance more than once.  Create another instance and add it.");
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
-        // Clean everything
-        disposeCodec(parent.getSession());
-    }
-
-    /**
-     * Process the incoming message, calling the session decoder. As the incoming
-     * buffer might contains more than one messages, we have to loop until the decoder
-     * throws an exception.
-     * 
-     *  while ( buffer not empty )
-     *    try
-     *      decode ( buffer )
-     *    catch
-     *      break;
-     * 
-     */
-    @Override
-    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
-        }
-
-        if (!(message instanceof IoBuffer)) {
-            nextFilter.messageReceived(session, message);
-            return;
-        }
-
-        IoBuffer in = (IoBuffer) message;
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
-        // Loop until we don't have anymore byte in the buffer,
-        // or until the decoder throws an unrecoverable exception or
-        // can't decoder a message, because there are not enough
-        // data in the buffer
-        while (in.hasRemaining()) {
-            int oldPos = in.position();
-            try {
-                synchronized (session) {
-                    // Call the decoder with the read bytes
-                    decoder.decode(session, in, decoderOut);
-                }
-                // Finish decoding if no exception was thrown.
-                decoderOut.flush(nextFilter, session);
-            } catch (Exception e) {
-                ProtocolDecoderException pde;
-                if (e instanceof ProtocolDecoderException) {
-                    pde = (ProtocolDecoderException) e;
-                } else {
-                    pde = new ProtocolDecoderException(e);
-                }
-                if (pde.getHexdump() == null) {
-                    // Generate a message hex dump
-                    int curPos = in.position();
-                    in.position(oldPos);
-                    pde.setHexdump(in.getHexDump());
-                    in.position(curPos);
-                }
-                // Fire the exceptionCaught event.
-                decoderOut.flush(nextFilter, session);
-                nextFilter.exceptionCaught(session, pde);
-                // Retry only if the type of the caught exception is
-                // recoverable and the buffer position has changed.
-                // We check buffer position additionally to prevent an
-                // infinite loop.
-                if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
-                    break;
-                }
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-        if (writeRequest instanceof EncodedWriteRequest) {
-            return;
-        }
-
-        nextFilter.messageSent(session, writeRequest);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
-        Object message = writeRequest.getMessage();
-
-        // Bypass the encoding if the message is contained in a IoBuffer,
-        // as it has already been encoded before
-        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
-            nextFilter.filterWrite(session, writeRequest);
-            return;
-        }
-
-        // Get the encoder in the session
-        ProtocolEncoder encoder = factory.getEncoder(session);
-
-        ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
-
-        if (encoder == null) {
-            throw new ProtocolEncoderException("The encoder is null for the session " + session);
-        }
-
-        try {
-            // Now we can try to encode the response
-            encoder.encode(session, message, encoderOut);
-
-            // Send it directly
-            Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
-
-            // Write all the encoded messages now
-            while (!bufferQueue.isEmpty()) {
-                Object encodedMessage = bufferQueue.poll();
-
-                if (encodedMessage == null) {
-                    break;
-                }
-
-		// Flush only when the buffer has remaining.
-		if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
-		    if (bufferQueue.isEmpty()) {
-			writeRequest.setMessage(encodedMessage);
+	/** A logger for this class */
+	private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
+
+	private static final Class<?>[] EMPTY_PARAMS = new Class[0];
+
+	private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
+
+	private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
+
+	private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
+
+	private static final ProtocolDecoderOutputLocal DECODER_OUTPUT = new ProtocolDecoderOutputLocal();
+
+	private static final ProtocolEncoderOutputLocal ENCODER_OUTPUT = new ProtocolEncoderOutputLocal();
+
+	/** The factory responsible for creating the encoder and decoder */
+	private final ProtocolCodecFactory factory;
+
+	/**
+	 * Creates a new instance of ProtocolCodecFilter, associating a factory for the
+	 * creation of the encoder and decoder.
+	 *
+	 * @param factory The associated factory
+	 */
+	public ProtocolCodecFilter(ProtocolCodecFactory factory) {
+		if (factory == null) {
+			throw new IllegalArgumentException("factory");
+		}
+
+		this.factory = factory;
+	}
+
+	/**
+	 * Creates a new instance of ProtocolCodecFilter, without any factory. The
+	 * encoder/decoder factory will be created as an inner class, using the two
+	 * parameters (encoder and decoder).
+	 * 
+	 * @param encoder The class responsible for encoding the message
+	 * @param decoder The class responsible for decoding the message
+	 */
+	public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
+		if (encoder == null) {
+			throw new IllegalArgumentException("encoder");
+		}
+		if (decoder == null) {
+			throw new IllegalArgumentException("decoder");
+		}
+
+		// Create the inner Factory based on the two parameters
+		this.factory = new ProtocolCodecFactory() {
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolEncoder getEncoder(IoSession session) {
+				return encoder;
+			}
+
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolDecoder getDecoder(IoSession session) {
+				return decoder;
+			}
+		};
+	}
+
+	/**
+	 * Creates a new instance of ProtocolCodecFilter, without any factory. The
+	 * encoder/decoder factory will be created as an inner class, using the two
+	 * parameters (encoder and decoder), which are class names. Instances for those
+	 * classes will be created in this constructor.
+	 * 
+	 * @param encoderClass The class responsible for encoding the message
+	 * @param decoderClass The class responsible for decoding the message
+	 */
+	public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
+			final Class<? extends ProtocolDecoder> decoderClass) {
+		if (encoderClass == null) {
+			throw new IllegalArgumentException("encoderClass");
+		}
+		if (decoderClass == null) {
+			throw new IllegalArgumentException("decoderClass");
+		}
+		if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
+			throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
+		}
+		if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
+			throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
+		}
+		try {
+			encoderClass.getConstructor(EMPTY_PARAMS);
+		} catch (NoSuchMethodException e) {
+			throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
+		}
+		try {
+			decoderClass.getConstructor(EMPTY_PARAMS);
+		} catch (NoSuchMethodException e) {
+			throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
+		}
+
+		final ProtocolEncoder encoder;
+
+		try {
+			encoder = encoderClass.newInstance();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("encoderClass cannot be initialized");
+		}
+
+		final ProtocolDecoder decoder;
+
+		try {
+			decoder = decoderClass.newInstance();
+		} catch (Exception e) {
+			throw new IllegalArgumentException("decoderClass cannot be initialized");
+		}
+
+		// Create the inner factory based on the two parameters.
+		this.factory = new ProtocolCodecFactory() {
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolEncoder getEncoder(IoSession session) throws Exception {
+				return encoder;
+			}
+
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public ProtocolDecoder getDecoder(IoSession session) throws Exception {
+				return decoder;
+			}
+		};
+	}
+
+	/**
+	 * Get the encoder instance from a given session.
+	 *
+	 * @param session The associated session we will get the encoder from
+	 * @return The encoder instance, if any
+	 */
+	public ProtocolEncoder getEncoder(IoSession session) {
+		return (ProtocolEncoder) session.getAttribute(ENCODER);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+		if (parent.contains(this)) {
+			throw new IllegalArgumentException(
+					"You can't add the same filter instance more than once.  Create another instance and add it.");
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+		// Clean everything
+		disposeCodec(parent.getSession());
+	}
+
+	/**
+	 * Process the incoming message, calling the session decoder. As the incoming
+	 * buffer might contains more than one messages, we have to loop until the
+	 * decoder throws an exception.
+	 * 
+	 * while ( buffer not empty ) try decode ( buffer ) catch break;
+	 * 
+	 */
+	@Override
+	public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message)
+			throws Exception {
+		if (LOGGER.isDebugEnabled()) {
+			LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
+		}
+
+		if (!(message instanceof IoBuffer)) {
+			nextFilter.messageReceived(session, message);
+			return;
+		}
+
+		final IoBuffer in = (IoBuffer) message;
+		final ProtocolDecoder decoder = factory.getDecoder(session);
+		final ProtocolDecoderOutputImpl decoderOut = DECODER_OUTPUT.get();
+
+		// Loop until we don't have anymore byte in the buffer,
+		// or until the decoder throws an unrecoverable exception or
+		// can't decoder a message, because there are not enough
+		// data in the buffer
+		while (in.hasRemaining()) {
+			int oldPos = in.position();
+			try {
+				// Call the decoder with the read bytes
+				decoder.decode(session, in, decoderOut);
+				// Finish decoding if no exception was thrown.
+				decoderOut.flush(nextFilter, session);
+			} catch (Exception e) {
+				ProtocolDecoderException pde;
+				if (e instanceof ProtocolDecoderException) {
+					pde = (ProtocolDecoderException) e;
+				} else {
+					pde = new ProtocolDecoderException(e);
+				}
+				if (pde.getHexdump() == null) {
+					// Generate a message hex dump
+					int curPos = in.position();
+					in.position(oldPos);
+					pde.setHexdump(in.getHexDump());
+					in.position(curPos);
+				}
+				// Fire the exceptionCaught event.
+				decoderOut.flush(nextFilter, session);
+				nextFilter.exceptionCaught(session, pde);
+				// Retry only if the type of the caught exception is
+				// recoverable and the buffer position has changed.
+				// We check buffer position additionally to prevent an
+				// infinite loop.
+				if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
+					break;
+				}
+			}
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+		if (writeRequest instanceof EncodedWriteRequest) {
+			return;
+		}
+
+		nextFilter.messageSent(session, writeRequest);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
+			throws Exception {
+		final Object message = writeRequest.getMessage();
+
+		// Bypass the encoding if the message is contained in a IoBuffer,
+		// as it has already been encoded before
+		if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
 			nextFilter.filterWrite(session, writeRequest);
-		    } else {
-			SocketAddress destination = writeRequest.getDestination();
-			WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
-			nextFilter.filterWrite(session, encodedWriteRequest);
-		    }
+			return;
+		}
+
+		// Get the encoder in the session
+		final ProtocolEncoder encoder = factory.getEncoder(session);
+		final ProtocolEncoderOutputImpl encoderOut = ENCODER_OUTPUT.get();
+
+		if (encoder == null) {
+			throw new ProtocolEncoderException("The encoder is null for the session " + session);
+		}
+
+		try {
+			// Now we can try to encode the response
+			encoder.encode(session, message, encoderOut);
+
+			final Queue<Object> queue = encoderOut.messageQueue;
+
+			if (queue.isEmpty()) {
+				// Write empty message to ensure that messageSent is fired later
+				writeRequest.setMessage(EMPTY_BUFFER);
+				nextFilter.filterWrite(session, writeRequest);
+			} else {
+				// Write all the encoded messages now
+				Object encodedMessage = null;
+
+				while ((encodedMessage = queue.poll()) != null) {
+					if (queue.isEmpty()) {
+						// Write last message using original WriteRequest to ensure that any Future and
+						// dependency on messageSent event is emitted correctly
+						writeRequest.setMessage(encodedMessage);
+						nextFilter.filterWrite(session, writeRequest);
+					} else {
+						SocketAddress destination = writeRequest.getDestination();
+						WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
+						nextFilter.filterWrite(session, encodedWriteRequest);
+					}
+				}
+			}
+		} catch (final ProtocolEncoderException e) {
+			throw e;
+		} catch (final Exception e) {
+			// Generate the correct exception
+			throw new ProtocolEncoderException(e);
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+		// Call finishDecode() first when a connection is closed.
+		ProtocolDecoder decoder = factory.getDecoder(session);
+		ProtocolDecoderOutput decoderOut = DECODER_OUTPUT.get();
+
+		try {
+			decoder.finishDecode(session, decoderOut);
+		} catch (Exception e) {
+			ProtocolDecoderException pde;
+			if (e instanceof ProtocolDecoderException) {
+				pde = (ProtocolDecoderException) e;
+			} else {
+				pde = new ProtocolDecoderException(e);
+			}
+			throw pde;
+		} finally {
+			// Dispose everything
+			disposeCodec(session);
+			decoderOut.flush(nextFilter, session);
+		}
+
+		// Call the next filter
+		nextFilter.sessionClosed(session);
+	}
+
+	private static class EncodedWriteRequest extends DefaultWriteRequest {
+		public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
+			super(encodedMessage, future, destination);
+		}
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public boolean isEncoded() {
+			return true;
+		}
+	}
+
+	private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
+		public ProtocolDecoderOutputImpl() {
+			// Do nothing
+		}
+	}
+
+	private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
+		public ProtocolEncoderOutputImpl() {
+			// Do nothing
+		}
+	}
+
+	// ----------- Helper methods ---------------------------------------------
+	/**
+	 * Dispose the encoder, decoder, and the callback for the decoded messages.
+	 */
+	private void disposeCodec(IoSession session) {
+		// We just remove the two instances of encoder/decoder to release resources
+		// from the session
+		disposeEncoder(session);
+		disposeDecoder(session);
+	}
+
+	/**
+	 * Dispose the encoder, removing its instance from the session's attributes, and
+	 * calling the associated dispose method.
+	 */
+	private void disposeEncoder(IoSession session) {
+		ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
+		if (encoder == null) {
+			return;
+		}
+
+		try {
+			encoder.dispose(session);
+		} catch (Exception e) {
+			LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
+		}
+	}
+
+	/**
+	 * Dispose the decoder, removing its instance from the session's attributes, and
+	 * calling the associated dispose method.
+	 */
+	private void disposeDecoder(IoSession session) {
+		ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
+		if (decoder == null) {
+			return;
+		}
+
+		try {
+			decoder.dispose(session);
+		} catch (Exception e) {
+			LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
+		}
+	}
+
+	static private class ProtocolDecoderOutputLocal extends ThreadLocal<ProtocolDecoderOutputImpl> {
+		@Override
+		protected ProtocolDecoderOutputImpl initialValue() {
+			return new ProtocolDecoderOutputImpl();
+		}
+	}
+
+	static private class ProtocolEncoderOutputLocal extends ThreadLocal<ProtocolEncoderOutputImpl> {
+		@Override
+		protected ProtocolEncoderOutputImpl initialValue() {
+			return new ProtocolEncoderOutputImpl();
 		}
-	    }
-        } catch (Exception e) {
-            ProtocolEncoderException pee;
-
-            // Generate the correct exception
-            if (e instanceof ProtocolEncoderException) {
-                pee = (ProtocolEncoderException) e;
-            } else {
-                pee = new ProtocolEncoderException(e);
-            }
-
-            throw pee;
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
-        // Call finishDecode() first when a connection is closed.
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
-        try {
-            decoder.finishDecode(session, decoderOut);
-        } catch (Exception e) {
-            ProtocolDecoderException pde;
-            if (e instanceof ProtocolDecoderException) {
-                pde = (ProtocolDecoderException) e;
-            } else {
-                pde = new ProtocolDecoderException(e);
-            }
-            throw pde;
-        } finally {
-            // Dispose everything
-            disposeCodec(session);
-            decoderOut.flush(nextFilter, session);
-        }
-
-        // Call the next filter
-        nextFilter.sessionClosed(session);
-    }
-
-    private static class EncodedWriteRequest extends DefaultWriteRequest {
-        public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
-            super(encodedMessage, future, destination);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public boolean isEncoded() {
-            return true;
-        }
-    }
-
-    private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
-        public ProtocolDecoderOutputImpl() {
-            // Do nothing
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public void flush(NextFilter nextFilter, IoSession session) {
-            Queue<Object> messageQueue = getMessageQueue();
-
-            while (!messageQueue.isEmpty()) {
-                nextFilter.messageReceived(session, messageQueue.poll());
-            }
-        }
-    }
-
-    private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
-        private final IoSession session;
-
-        private final NextFilter nextFilter;
-
-        /** The WriteRequest destination */
-        private final SocketAddress destination;
-
-        public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
-            this.session = session;
-            this.nextFilter = nextFilter;
-
-            // Only store the destination, not the full WriteRequest.
-            destination = writeRequest.getDestination();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public WriteFuture flush() {
-            Queue<Object> bufferQueue = getMessageQueue();
-            WriteFuture future = null;
-
-            while (!bufferQueue.isEmpty()) {
-                Object encodedMessage = bufferQueue.poll();
-
-                if (encodedMessage == null) {
-                    break;
-                }
-
-                // Flush only when the buffer has remaining.
-                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
-                    future = new DefaultWriteFuture(session);
-                    nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
-                }
-            }
-
-            if (future == null) {
-                // Creates an empty writeRequest containing the destination
-                future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
-            }
-
-            return future;
-        }
-    }
-
-    //----------- Helper methods ---------------------------------------------
-    /**
-     * Dispose the encoder, decoder, and the callback for the decoded
-     * messages.
-     */
-    private void disposeCodec(IoSession session) {
-        // We just remove the two instances of encoder/decoder to release resources
-        // from the session
-        disposeEncoder(session);
-        disposeDecoder(session);
-
-        // We also remove the callback
-        disposeDecoderOut(session);
-    }
-
-    /**
-     * Dispose the encoder, removing its instance from the
-     * session's attributes, and calling the associated
-     * dispose method.
-     */
-    private void disposeEncoder(IoSession session) {
-        ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
-        if (encoder == null) {
-            return;
-        }
-
-        try {
-            encoder.dispose(session);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
-        }
-    }
-
-    /**
-     * Dispose the decoder, removing its instance from the
-     * session's attributes, and calling the associated
-     * dispose method.
-     */
-    private void disposeDecoder(IoSession session) {
-        ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
-        if (decoder == null) {
-            return;
-        }
-
-        try {
-            decoder.dispose(session);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
-        }
-    }
-
-    /**
-     * Return a reference to the decoder callback. If it's not already created
-     * and stored into the session, we create a new instance.
-     */
-    private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
-        ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
-
-        if (out == null) {
-            // Create a new instance, and stores it into the session
-            out = new ProtocolDecoderOutputImpl();
-            session.setAttribute(DECODER_OUT, out);
-        }
-
-        return out;
-    }
-
-    private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
-        ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
-
-        if (out == null) {
-            // Create a new instance, and stores it into the session
-            out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
-            session.setAttribute(ENCODER_OUT, out);
-        }
-
-        return out;
-    }
-
-    /**
-     * Remove the decoder callback from the session's attributes.
-     */
-    private void disposeDecoderOut(IoSession session) {
-        session.removeAttribute(DECODER_OUT);
-    }
+	}
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
index 2b5f89c..1638491 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
@@ -59,17 +59,8 @@ import org.apache.mina.core.session.IoSession;
  */
 public class ProtocolCodecSession extends DummySession {
 
-    private final WriteFuture notWrittenFuture = DefaultWriteFuture.newNotWrittenFuture(this,
-            new UnsupportedOperationException());
-
     private final AbstractProtocolEncoderOutput encoderOutput = new AbstractProtocolEncoderOutput() {
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public WriteFuture flush() {
-            return notWrittenFuture;
-        }
+
     };
 
     private final AbstractProtocolDecoderOutput decoderOutput = new AbstractProtocolDecoderOutput() {
@@ -101,7 +92,7 @@ public class ProtocolCodecSession extends DummySession {
      * @return the {@link Queue} of the buffered encoder output.
      */
     public Queue<Object> getEncoderOutputQueue() {
-        return encoderOutput.getMessageQueue();
+        return encoderOutput.messageQueue;
     }
 
     /**
@@ -116,6 +107,6 @@ public class ProtocolCodecSession extends DummySession {
      * @return the {@link Queue} of the buffered decoder output.
      */
     public Queue<Object> getDecoderOutputQueue() {
-        return decoderOutput.getMessageQueue();
+        return decoderOutput.messageQueue;
     }
 }
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
index 0fc847c..508ee23 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
@@ -21,44 +21,22 @@ package org.apache.mina.filter.codec;
 
 import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.file.FileRegion;
-import org.apache.mina.core.future.WriteFuture;
 
 /**
  * Callback for {@link ProtocolEncoder} to generate encoded messages such as
- * {@link IoBuffer}s.  {@link ProtocolEncoder} must call {@link #write(Object)}
+ * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)}
  * for each encoded message.
  *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public interface ProtocolEncoderOutput {
-    /**
-     * Callback for {@link ProtocolEncoder} to generate an encoded message such
-     * as an {@link IoBuffer}. {@link ProtocolEncoder} must call
-     * {@link #write(Object)} for each encoded message.
-     *
-     * @param encodedMessage the encoded message, typically an {@link IoBuffer}
-     *                       or a {@link FileRegion}.
-     */
-    void write(Object encodedMessage);
-
-    /**
-     * Merges all buffers you wrote via {@link #write(Object)} into
-     * one {@link IoBuffer} and replaces the old fragmented ones with it.
-     * This method is useful when you want to control the way MINA generates
-     * network packets.  Please note that this method only works when you
-     * called {@link #write(Object)} method with only {@link IoBuffer}s.
-     * 
-     * @throws IllegalStateException if you wrote something else than {@link IoBuffer}
-     */
-    void mergeAll();
-
-    /**
-     * Flushes all buffers you wrote via {@link #write(Object)} to
-     * the session.  This operation is asynchronous; please wait for
-     * the returned {@link WriteFuture} if you want to wait for
-     * the buffers flushed.
-     *
-     * @return <tt>null</tt> if there is nothing to flush at all.
-     */
-    WriteFuture flush();
+	/**
+	 * Callback for {@link ProtocolEncoder} to generate an encoded message such as
+	 * an {@link IoBuffer}. {@link ProtocolEncoder} must call {@link #write(Object)}
+	 * for each encoded message.
+	 *
+	 * @param message the encoded message, typically an {@link IoBuffer} or a
+	 *                {@link FileRegion}.
+	 */
+	void write(Object message);
 }
\ No newline at end of file
diff --git a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
index 87b886d..f8497b8 100644
--- a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
+++ b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
@@ -25,9 +25,9 @@ import static org.junit.Assert.assertTrue;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
+import java.util.Queue;
 
 import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.filterchain.IoFilter.NextFilter;
 import org.apache.mina.core.session.DummySession;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput;
@@ -37,262 +37,250 @@ import org.apache.mina.http.api.HttpRequest;
 import org.junit.Test;
 
 public class HttpServerDecoderTest {
-    private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
+	private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
 
-    private static final ProtocolDecoder decoder = new HttpServerDecoder();
+	private static final ProtocolDecoder decoder = new HttpServerDecoder();
 
-    /*
-     * Use a single session for all requests in order to test state management better
-     */
-    private static IoSession session = new DummySession();
+	/*
+	 * Use a single session for all requests in order to test state management
+	 * better
+	 */
+	private static IoSession session = new DummySession();
 
-    /**
-     * Build an IO buffer containing a simple minimal HTTP request.
-     * 
-     * @param method the HTTP method
-     * @param body the option body
-     * @return the built IO buffer
-     * @throws CharacterCodingException if encoding fails
-     */
-    protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException {
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
-        
-        if (body != null) {
-            buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder);
-            buffer.putString(body, encoder);
-        } else {
-            buffer.putString("\r\n", encoder);
-        }
-        
-        buffer.rewind();
-        
-        return buffer;
-    }
+	/**
+	 * Build an IO buffer containing a simple minimal HTTP request.
+	 * 
+	 * @param method the HTTP method
+	 * @param body   the option body
+	 * @return the built IO buffer
+	 * @throws CharacterCodingException if encoding fails
+	 */
+	protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException {
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
 
-    protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException {
-        return getRequestBuffer(method, null);
-    }
+		if (body != null) {
+			buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder);
+			buffer.putString(body, encoder);
+		} else {
+			buffer.putString("\r\n", encoder);
+		}
 
-    /**
-     * Execute an HTPP request and return the queue of messages.
-     * 
-     * @param method the HTTP method
-     * @param body the optional body
-     * @return the protocol output and its queue of messages
-     * @throws Exception if error occurs (encoding,...)
-     */
-    protected static AbstractProtocolDecoderOutput executeRequest(String method, String body) throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
+		buffer.rewind();
 
-        IoBuffer buffer = getRequestBuffer(method, body); //$NON-NLS-1$
-        
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        
-        return out;
-    }
+		return buffer;
+	}
 
-    @Test
-    public void testGetRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("GET", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException {
+		return getRequestBuffer(method, null);
+	}
 
-    @Test
-    public void testGetRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("GET", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	protected static class ProtocolDecoderQueue extends AbstractProtocolDecoderOutput {
+		public Queue<Object> getQueue() {
+			return this.messageQueue;
+		}
+	}
 
-    @Test
-    public void testPutRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("PUT", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	/**
+	 * Execute an HTPP request and return the queue of messages.
+	 * 
+	 * @param method the HTTP method
+	 * @param body   the optional body
+	 * @return the protocol output and its queue of messages
+	 * @throws Exception if error occurs (encoding,...)
+	 */
+	protected static ProtocolDecoderQueue executeRequest(String method, String body) throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
 
-    @Test
-    public void testPutRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("PUT", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+		IoBuffer buffer = getRequestBuffer(method, body); // $NON-NLS-1$
 
-    @Test
-    public void testPostRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("POST", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
 
-    @Test
-    public void testPostRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("POST", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+		return out;
+	}
 
-    @Test
-    public void testDeleteRequestWithoutBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("DELETE", null);
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testGetRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("GET", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void testDeleteRequestBody() throws Exception {
-        AbstractProtocolDecoderOutput out = executeRequest("DELETE", "body");
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    
-    @Test
-    public void testDIRMINA965NoContent() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testGetRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("GET", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void testDIRMINA965WithContent() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(3, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    @Test
-    public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("B", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(4, out.getMessageQueue().size());
-        assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
-    
-    @Test
-    public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testPutRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("PUT", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:  localhost\r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testPutRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("PUT", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 
-    @Test
-    public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception {
-        AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
-            public void flush(NextFilter nextFilter, IoSession session) {
-            }
-        };
-        IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
-        buffer.putString("GET / HTTP/1.0\r\nHost:localhost  \r\n\r\n", encoder);
-        buffer.rewind();
-        while (buffer.hasRemaining()) {
-            decoder.decode(session, buffer, out);
-        }
-        assertEquals(2, out.getMessageQueue().size());
-        HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
-        assertEquals("localhost", request.getHeader("host"));
-        assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
-    }
+	@Test
+	public void testPostRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("POST", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testPostRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("POST", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDeleteRequestWithoutBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("DELETE", null);
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDeleteRequestBody() throws Exception {
+		ProtocolDecoderQueue out = executeRequest("DELETE", "body");
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDIRMINA965NoContent() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("dummy\r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDIRMINA965WithContent() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+
+		assertEquals(3, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("B", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(4, out.getQueue().size());
+		assertTrue(out.getQueue().poll() instanceof HttpRequest);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof IoBuffer);
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		HttpRequest request = (HttpRequest) out.getQueue().poll();
+		assertEquals("localhost", request.getHeader("host"));
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.0\r\nHost:  localhost\r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		HttpRequest request = (HttpRequest) out.getQueue().poll();
+		assertEquals("localhost", request.getHeader("host"));
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
+
+	@Test
+	public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception {
+		ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+		IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+		buffer.putString("GET / HTTP/1.0\r\nHost:localhost  \r\n\r\n", encoder);
+		buffer.rewind();
+		while (buffer.hasRemaining()) {
+			decoder.decode(session, buffer, out);
+		}
+		assertEquals(2, out.getQueue().size());
+		HttpRequest request = (HttpRequest) out.getQueue().poll();
+		assertEquals("localhost", request.getHeader("host"));
+		assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+	}
 }