You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/02/21 11:09:09 UTC

svn commit: r629731 [3/3] - in /incubator/qpid/branches/M2.1/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/q...

Added: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java?rev=629731&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java (added)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java Thu Feb 21 02:09:03 2008
@@ -0,0 +1,440 @@
+package org.apache.mina.filter.codec;
+
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultWriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.util.SessionLog;
+import org.apache.mina.util.Queue;
+
+
+public class QpidProtocolCodecFilter extends IoFilterAdapter
+{
+    public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
+    public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
+
+    private static final Class[] EMPTY_PARAMS = new Class[0];
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
+
+    private final ProtocolCodecFactory factory;
+
+    public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
+    {
+        if( factory == null )
+        {
+            throw new NullPointerException( "factory" );
+        }
+        this.factory = factory;
+    }
+
+    public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+    {
+        if( encoder == null )
+        {
+            throw new NullPointerException( "encoder" );
+        }
+        if( decoder == null )
+        {
+            throw new NullPointerException( "decoder" );
+        }
+
+        this.factory = new ProtocolCodecFactory()
+        {
+            public ProtocolEncoder getEncoder()
+            {
+                return encoder;
+            }
+
+            public ProtocolDecoder getDecoder()
+            {
+                return decoder;
+            }
+        };
+    }
+
+    public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+    {
+        if( encoderClass == null )
+        {
+            throw new NullPointerException( "encoderClass" );
+        }
+        if( decoderClass == null )
+        {
+            throw new NullPointerException( "decoderClass" );
+        }
+        if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+        {
+            throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+        }
+        if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+        {
+            throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+        }
+        try
+        {
+            encoderClass.getConstructor( EMPTY_PARAMS );
+        }
+        catch( NoSuchMethodException e )
+        {
+            throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+        }
+        try
+        {
+            decoderClass.getConstructor( EMPTY_PARAMS );
+        }
+        catch( NoSuchMethodException e )
+        {
+            throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+        }
+
+        this.factory = new ProtocolCodecFactory()
+        {
+            public ProtocolEncoder getEncoder() throws Exception
+            {
+                return ( ProtocolEncoder ) encoderClass.newInstance();
+            }
+
+            public ProtocolDecoder getDecoder() throws Exception
+            {
+                return ( ProtocolDecoder ) decoderClass.newInstance();
+            }
+        };
+    }
+
+    public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+    {
+        if( parent.contains( ProtocolCodecFilter.class ) )
+        {
+            throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+        }
+    }
+
+    public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    {
+        if( !( message instanceof ByteBuffer ) )
+        {
+            nextFilter.messageReceived( session, message );
+            return;
+        }
+
+        ByteBuffer in = ( ByteBuffer ) message;
+        ProtocolDecoder decoder = getDecoder( session );
+        ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+
+        try
+        {
+            decoder.decode( session, in, decoderOut );
+        }
+        catch( Throwable t )
+        {
+            ProtocolDecoderException pde;
+            if( t instanceof ProtocolDecoderException )
+            {
+                pde = ( ProtocolDecoderException ) t;
+            }
+            else
+            {
+                pde = new ProtocolDecoderException( t );
+            }
+            pde.setHexdump( in.getHexDump() );
+            throw pde;
+        }
+        finally
+        {
+            // Dispose the decoder if this session is connectionless.
+            if( session.getTransportType().isConnectionless() )
+            {
+                disposeDecoder( session );
+            }
+
+            // Release the read buffer.
+            in.release();
+
+            decoderOut.flush();
+        }
+    }
+
+    public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    {
+        if( message instanceof HiddenByteBuffer )
+        {
+            return;
+        }
+
+        if( !( message instanceof MessageByteBuffer ) )
+        {
+            nextFilter.messageSent( session, message );
+            return;
+        }
+
+        nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
+    }
+
+    public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
+    {
+        Object message = writeRequest.getMessage();
+        if( message instanceof ByteBuffer )
+        {
+            nextFilter.filterWrite( session, writeRequest );
+            return;
+        }
+
+        ProtocolEncoder encoder = getEncoder( session );
+        ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
+
+        try
+        {
+            encoder.encode( session, message, encoderOut );
+            encoderOut.flush();
+            nextFilter.filterWrite(
+                    session,
+                    new IoFilter.WriteRequest(
+                            new MessageByteBuffer( writeRequest.getMessage() ),
+                            writeRequest.getFuture(), writeRequest.getDestination() ) );
+        }
+        catch( Throwable t )
+        {
+            ProtocolEncoderException pee;
+            if( t instanceof ProtocolEncoderException )
+            {
+                pee = ( ProtocolEncoderException ) t;
+            }
+            else
+            {
+                pee = new ProtocolEncoderException( t );
+            }
+            throw pee;
+        }
+        finally
+        {
+            // Dispose the encoder if this session is connectionless.
+            if( session.getTransportType().isConnectionless() )
+            {
+                disposeEncoder( session );
+            }
+        }
+    }
+
+    public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
+    {
+        // Call finishDecode() first when a connection is closed.
+        ProtocolDecoder decoder = getDecoder( session );
+        ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+        try
+        {
+            decoder.finishDecode( session, decoderOut );
+        }
+        catch( Throwable t )
+        {
+            ProtocolDecoderException pde;
+            if( t instanceof ProtocolDecoderException )
+            {
+                pde = ( ProtocolDecoderException ) t;
+            }
+            else
+            {
+                pde = new ProtocolDecoderException( t );
+            }
+            throw pde;
+        }
+        finally
+        {
+            // Dispose all.
+            disposeEncoder( session );
+            disposeDecoder( session );
+
+            decoderOut.flush();
+        }
+
+        nextFilter.sessionClosed( session );
+    }
+
+    private ProtocolEncoder getEncoder( IoSession session ) throws Exception
+    {
+        ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+        if( encoder == null )
+        {
+            encoder = factory.getEncoder();
+            session.setAttribute( ENCODER, encoder );
+        }
+        return encoder;
+    }
+
+    private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+    {
+        return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
+    }
+
+    private ProtocolDecoder getDecoder( IoSession session ) throws Exception
+    {
+        ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+        if( decoder == null )
+        {
+            decoder = factory.getDecoder();
+            session.setAttribute( DECODER, decoder );
+        }
+        return decoder;
+    }
+
+    private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
+    {
+        return new SimpleProtocolDecoderOutput( session, nextFilter );
+    }
+
+    private void disposeEncoder( IoSession session )
+    {
+        ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+        if( encoder == null )
+        {
+            return;
+        }
+
+        try
+        {
+            encoder.dispose( session );
+        }
+        catch( Throwable t )
+        {
+            SessionLog.warn(
+                    session,
+                    "Failed to dispose: " + encoder.getClass().getName() +
+                    " (" + encoder + ')' );
+        }
+    }
+
+    private void disposeDecoder( IoSession session )
+    {
+        ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+        if( decoder == null )
+        {
+            return;
+        }
+
+        try
+        {
+            decoder.dispose( session );
+        }
+        catch( Throwable t )
+        {
+            SessionLog.warn(
+                    session,
+                    "Falied to dispose: " + decoder.getClass().getName() +
+                    " (" + decoder + ')' );
+        }
+    }
+
+    private static class HiddenByteBuffer extends ByteBufferProxy
+    {
+        private HiddenByteBuffer( ByteBuffer buf )
+        {
+            super( buf );
+        }
+    }
+
+    private static class MessageByteBuffer extends ByteBufferProxy
+    {
+        private final Object message;
+
+        private MessageByteBuffer( Object message )
+        {
+            super( EMPTY_BUFFER );
+            this.message = message;
+        }
+
+        public void acquire()
+        {
+            // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+        }
+
+        public void release()
+        {
+            // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+        }
+    }
+
+    private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
+    {
+        private ByteBuffer buffer;
+
+        private final IoSession session;
+        private final IoFilter.NextFilter nextFilter;
+        private final IoFilter.WriteRequest writeRequest;
+
+        public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+        {
+            this.session = session;
+            this.nextFilter = nextFilter;
+            this.writeRequest = writeRequest;
+        }
+
+
+
+        public void write( ByteBuffer buf )
+        {
+            if(buffer != null)
+            {
+                flush();
+            }
+            buffer =  buf;
+        }
+
+        public void mergeAll()
+        {
+        }
+
+        public WriteFuture flush()
+        {
+            WriteFuture future = null;
+            if( buffer == null )
+            {
+                return null;
+            }
+            else
+            {
+                    ByteBuffer buf = buffer;
+                    // Flush only when the buffer has remaining.
+                    if( buf.hasRemaining() )
+                    {
+                        future = doFlush( buf );
+                    }
+
+            }
+
+            return future;
+        }
+
+
+        protected WriteFuture doFlush( ByteBuffer buf )
+        {
+            WriteFuture future = new DefaultWriteFuture( session );
+            nextFilter.filterWrite(
+                    session,
+                    new IoFilter.WriteRequest(
+                            buf,
+                            future, writeRequest.getDestination() ) );
+            return future;
+        }
+    }
+}
+

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Feb 21 02:09:03 2008
@@ -22,6 +22,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 
@@ -48,6 +49,9 @@
  */
 public class AMQDecoder extends CumulativeProtocolDecoder
 {
+
+    private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
     /** Holds the 'normal' AMQP data decoder. */
     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
 
@@ -171,4 +175,97 @@
     {
         _expectProtocolInitiation = expectProtocolInitiation;
     }
+
+
+ /**
+     * Cumulates content of <tt>in</tt> into internal buffer and forwards
+     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+     * and the cumulative buffer is compacted after decoding ends.
+     *
+     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+     *                               <tt>true</tt> not consuming the cumulative buffer.
+     */
+    public void decode( IoSession session, ByteBuffer in,
+                        ProtocolDecoderOutput out ) throws Exception
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        // if we have a session buffer, append data to that otherwise
+        // use the buffer read from the network directly
+        if( buf != null )
+        {
+            buf.put( in );
+            buf.flip();
+        }
+        else
+        {
+            buf = in;
+        }
+
+        for( ;; )
+        {
+            int oldPos = buf.position();
+            boolean decoded = doDecode( session, buf, out );
+            if( decoded )
+            {
+                if( buf.position() == oldPos )
+                {
+                    throw new IllegalStateException(
+                            "doDecode() can't return true when buffer is not consumed." );
+                }
+
+                if( !buf.hasRemaining() )
+                {
+                    break;
+                }
+            }
+            else
+            {
+                break;
+            }
+        }
+
+        // if there is any data left that cannot be decoded, we store
+        // it in a buffer in the session and next time this decoder is
+        // invoked the session buffer gets appended to
+        if ( buf.hasRemaining() )
+        {
+            storeRemainingInSession( buf, session );
+        }
+        else
+        {
+            removeSessionBuffer( session );
+        }
+    }
+
+    /**
+     * Releases the cumulative buffer used by the specified <tt>session</tt>.
+     * Please don't forget to call <tt>super.dispose( session )</tt> when
+     * you override this method.
+     */
+    public void dispose( IoSession session ) throws Exception
+    {
+        removeSessionBuffer( session );
+    }
+
+    private void removeSessionBuffer(IoSession session)
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        if( buf != null )
+        {
+            buf.release();
+            session.removeAttribute( BUFFER );
+        }
+    }
+
+    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+    private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+    {
+        ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+        remainingBuf.setAutoExpand( true );
+        remainingBuf.put( buf );
+        session.setAttribute( BUFFER, remainingBuf );
+    }
+
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public interface AMQBody
 {
@@ -36,4 +38,6 @@
     
     //public void populateFromBuffer(ByteBuffer buffer, long size)
     //    throws AMQFrameDecodingException, AMQProtocolVersionException;        
+
+    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Thu Feb 21 02:09:03 2008
@@ -27,7 +27,7 @@
     private final int _channel;
 
     private final AMQBody _bodyFrame;
-
+    public static final byte FRAME_END_BYTE = (byte) 0xCE;
 
 
     public AMQFrame(final int channel, final AMQBody bodyFrame)
@@ -47,13 +47,19 @@
         return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
     }
 
+    public static final int getFrameOverhead()
+    {
+        return 1 + 2 + 4 + 1;
+    }
+
+
     public void writePayload(ByteBuffer buffer)
     {
         buffer.put(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, _channel);
         EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
         _bodyFrame.writePayload(buffer);
-        buffer.put((byte) 0xCE);
+        buffer.put(FRAME_END_BYTE);
     }
 
     public final int getChannel()
@@ -66,10 +72,54 @@
         return _bodyFrame;
     }
 
-
-
     public String toString()
     {
         return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
     }
+
+    public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+    {
+        buffer.put(body.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+        body.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+
+    }
+
+    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+    {
+        buffer.put(body1.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+        body1.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body2.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+        body2.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+
+    }
+
+    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+    {
+        buffer.put(body1.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+        body1.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body2.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+        body2.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body3.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+        body3.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+
+    }
+
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Thu Feb 21 02:09:03 2008
@@ -24,7 +24,9 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
@@ -84,6 +86,11 @@
     public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
     {
         return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+    {
+        session.methodFrameReceived(channelId, this);
     }
 
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Feb 21 02:09:03 2008
@@ -26,8 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
 import java.lang.ref.WeakReference;
 
 /**
@@ -38,6 +37,62 @@
  */
 public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
 {
+    private static final byte MINUS = (byte)'-';
+    private static final byte ZERO = (byte) '0';
+
+
+
+    private final class TokenizerImpl implements AMQShortStringTokenizer
+    {
+        private final byte _delim;
+        private int _count = -1;
+        private int _pos = 0;
+
+        public TokenizerImpl(final byte delim)
+        {
+            _delim = delim;
+        }
+
+        public int countTokens()
+        {
+            if(_count == -1)
+            {
+                _count = 1 + AMQShortString.this.occurences(_delim);
+            }
+            return _count;
+        }
+
+        public AMQShortString nextToken()
+        {
+            if(_pos <= AMQShortString.this.length())
+            {
+                int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+                if(nextDelim == -1)
+                {
+                    nextDelim = AMQShortString.this.length();
+                }
+
+                AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+                _pos = nextDelim;
+                return nextToken;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public boolean hasMoreTokens()
+        {
+            return _pos <= AMQShortString.this.length();
+        }
+    }
+
+    private AMQShortString substring(final int from, final int to)
+    {
+        return new AMQShortString(_data, from, to);
+    }
+
 
     private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
             new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
 
-    private final ByteBuffer _data;
+    private final byte[] _data;
+    private final int _offset;
     private int _hashCode;
     private final int _length;
     private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -63,17 +119,25 @@
     public AMQShortString(byte[] data)
     {
 
-        _data = ByteBuffer.wrap(data);
+        _data = data.clone();
         _length = data.length;
+        _offset = 0;
+    }
+
+    public AMQShortString(byte[] data, int pos)
+    {
+        final int size = data[pos++];
+        final byte[] dataCopy = new byte[size];
+        System.arraycopy(data,pos,dataCopy,0,size);
+        _length = size;
+        _data = dataCopy;
+        _offset = 0;
     }
 
     public AMQShortString(String data)
     {
         this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-        if (data != null)
-        {
-            _hashCode = data.hashCode();
-        }
+
     }
 
     public AMQShortString(char[] data)
@@ -85,14 +149,17 @@
 
         final int length = data.length;
         final byte[] stringBytes = new byte[length];
+        int hash = 0;
         for (int i = 0; i < length; i++)
         {
             stringBytes[i] = (byte) (0xFF & data[i]);
+            hash = (31 * hash) + stringBytes[i];
         }
+        _hashCode = hash;
+        _data = stringBytes;
 
-        _data = ByteBuffer.wrap(stringBytes);
-        _data.rewind();
         _length = length;
+        _offset = 0;
 
     }
 
@@ -108,20 +175,31 @@
 
         }
 
-        _data = ByteBuffer.wrap(stringBytes);
-        _data.rewind();
+        _data = stringBytes;
         _hashCode = hash;
         _length = length;
+        _offset = 0;
 
     }
 
-    private AMQShortString(ByteBuffer data)
+    private AMQShortString(ByteBuffer data, final int length)
     {
-        _data = data;
-        _length = data.limit();
+        byte[] dataBytes = new byte[length];
+        data.get(dataBytes);
+        _data = dataBytes;
+        _length = length;
+        _offset = 0;
+
+    }
 
+    private AMQShortString(final byte[] data, final int from, final int to)
+    {
+        _offset = from;
+        _length = to - from;
+        _data = data;
     }
 
+
     /**
      * Get the length of the short string
      * @return length of the underlying byte array
@@ -134,7 +212,7 @@
     public char charAt(int index)
     {
 
-        return (char) _data.get(index);
+        return (char) _data[_offset + index];
 
     }
 
@@ -146,27 +224,24 @@
     public int writeToByteArray(byte[] encoding, int pos)
     {
         final int size = length();
-        encoding[pos++] = (byte) length();
-        for (int i = 0; i < size; i++)
-        {
-            encoding[pos++] = _data.get(i);
-        }
-
-        return pos;
+        encoding[pos++] = (byte) size;
+        System.arraycopy(_data,_offset,encoding,pos,size);
+        return pos+size;
     }
 
     public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
     {
 
-        final byte len = byteEncodedDestination[pos];
-        if (len == 0)
+
+        final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+        if(shortString.length() == 0)
         {
             return null;
         }
-
-        ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
-        return new AMQShortString(data);
+        else
+        {
+            return shortString;
+        }
     }
 
     public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -178,90 +253,59 @@
         }
         else
         {
-            ByteBuffer data = buffer.slice();
-            data.limit(length);
-            data.rewind();
-            buffer.skip(length);
 
-            return new AMQShortString(data);
+            return new AMQShortString(buffer, length);
         }
     }
 
     public byte[] getBytes()
     {
-
-        if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+        if(_offset == 0 && _length == _data.length)
         {
-            return _data.array();
+            return _data.clone();
         }
         else
         {
-            final int size = length();
-            byte[] b = new byte[size];
-            ByteBuffer buf = _data.duplicate();
-            buf.rewind();
-            buf.get(b);
-
-            return b;
+            byte[] data = new byte[_length];
+            System.arraycopy(_data,_offset,data,0,_length);
+            return data;
         }
-
     }
 
     public void writeToBuffer(ByteBuffer buffer)
     {
 
         final int size = length();
-        if (size != 0)
-        {
-
-            buffer.setAutoExpand(true);
-            buffer.put((byte) size);
-            if (_data.buf().hasArray())
-            {
-                buffer.put(_data.array(), _data.arrayOffset(), length());
-            }
-            else
-            {
-
-                for (int i = 0; i < size; i++)
-                {
-
-                    buffer.put(_data.get(i));
-                }
-            }
-        }
-        else
-        {
-            // really writing out unsigned byte
-            buffer.put((byte) 0);
-        }
+        //buffer.setAutoExpand(true);
+        buffer.put((byte) size);
+        buffer.put(_data, _offset, size);
 
     }
 
     private final class CharSubSequence implements CharSequence
     {
-        private final int _offset;
+        private final int _sequenceOffset;
         private final int _end;
 
         public CharSubSequence(final int offset, final int end)
         {
-            _offset = offset;
+            _sequenceOffset = offset;
             _end = end;
         }
 
         public int length()
         {
-            return _end - _offset;
+            return _end - _sequenceOffset;
         }
 
         public char charAt(int index)
         {
-            return AMQShortString.this.charAt(index + _offset);
+            return AMQShortString.this.charAt(index + _sequenceOffset);
         }
 
         public CharSequence subSequence(int start, int end)
         {
-            return new CharSubSequence(start + _offset, end + _offset);
+            return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
         }
     }
 
@@ -272,7 +316,7 @@
 
         for (int i = 0; i < size; i++)
         {
-            chars[i] = (char) _data.get(i);
+            chars[i] = (char) _data[i + _offset];
         }
 
         return chars;
@@ -285,6 +329,17 @@
 
     public boolean equals(Object o)
     {
+
+
+        if(o instanceof AMQShortString)
+        {
+            return equals((AMQShortString)o);
+        }
+        if(o instanceof CharSequence)
+        {
+            return equals((CharSequence)o);
+        }
+
         if (o == null)
         {
             return false;
@@ -295,26 +350,40 @@
             return true;
         }
 
-        if (o instanceof AMQShortString)
-        {
 
-            final AMQShortString otherString = (AMQShortString) o;
+        return false;
 
-            if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
-            {
-                return false;
-            }
+    }
+
+    public boolean equals(final AMQShortString otherString)
+    {
+        if (otherString == this)
+        {
+            return true;
+        }
 
-            return _data.equals(otherString._data);
+        if (otherString == null)
+        {
+            return false;
+        }
 
+        if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+        {
+            return false;
         }
 
-        return (o instanceof CharSequence) && equals((CharSequence) o);
+        return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+                || Arrays.equals(getBytes(),otherString.getBytes());
 
     }
 
     public boolean equals(CharSequence s)
     {
+        if(s instanceof AMQShortString)
+        {
+            return equals((AMQShortString)s);
+        }
+
         if (s == null)
         {
             return false;
@@ -345,7 +414,7 @@
 
             for (int i = 0; i < size; i++)
             {
-                hash = (31 * hash) + _data.get(i);
+                hash = (31 * hash) + _data[i+_offset];
             }
 
             _hashCode = hash;
@@ -380,8 +449,8 @@
 
             for (int i = 0; i < length(); i++)
             {
-                final byte d = _data.get(i);
-                final byte n = name._data.get(i);
+                final byte d = _data[i+_offset];
+                final byte n = name._data[i+name._offset];
                 if (d < n)
                 {
                     return -1;
@@ -398,6 +467,12 @@
     }
 
 
+    public AMQShortStringTokenizer tokenize(byte delim)
+    {
+        return new TokenizerImpl(delim);
+    }
+
+
     public AMQShortString intern()
     {
 
@@ -435,4 +510,111 @@
         return internString;
 
     }
+
+    private int occurences(final byte delim)
+    {
+        int count = 0;
+        final int end = _offset + _length;
+        for(int i = _offset ; i < end ; i++ )
+        {
+            if(_data[i] == delim)
+            {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private int indexOf(final byte val, final int pos)
+    {
+
+        for(int i = pos; i < length(); i++)
+        {
+            if(_data[_offset+i] == val)
+            {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+
+    public static AMQShortString join(final Collection<AMQShortString> terms,
+                                       final AMQShortString delim)
+    {
+        if(terms.size() == 0)
+        {
+            return EMPTY_STRING;
+        }
+
+        int size = delim.length() * (terms.size() - 1);
+        for(AMQShortString term : terms)
+        {
+            size += term.length();
+        }
+
+        byte[] data = new byte[size];
+        int pos = 0;
+        final byte[] delimData = delim._data;
+        final int delimOffset = delim._offset;
+        final int delimLength = delim._length;
+
+
+        for(AMQShortString term : terms)
+        {
+
+            if(pos!=0)
+            {
+                System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+                pos+=delimLength;
+            }
+            System.arraycopy(term._data,term._offset,data,pos,term._length);
+            pos+=term._length;
+        }
+
+
+
+        return new AMQShortString(data,0,size);  
+    }
+
+    public int toIntValue()
+    {
+        int pos = 0;
+        int val = 0;
+
+
+        boolean isNegative = (_data[pos] == MINUS);
+        if(isNegative)
+        {
+            pos++;
+        }
+        while(pos < _length)
+        {
+            int digit = (int) (_data[pos++] - ZERO);
+            if((digit < 0) || (digit > 9))
+            {
+                throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+            }
+            val = val * 10;
+            val += digit;
+        }
+        if(isNegative)
+        {
+            val = val * -1;
+        }
+        return val;
+    }
+
+    public boolean contains(final byte b)
+    {
+        for(int i = 0; i < _length; i++)
+        {
+            if(_data[i] == b)
+            {
+                return true;
+            }
+        }
+        return false;  //To change body of created methods use File | Settings | File Templates.
+    }
+
 }

Added: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java?rev=629731&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java (added)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java Thu Feb 21 02:09:03 2008
@@ -0,0 +1,31 @@
+package org.apache.qpid.framing;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface AMQShortStringTokenizer
+{
+
+    public int countTokens();
+
+    public AMQShortString nextToken();
+
+    boolean hasMoreTokens();
+}

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Thu Feb 21 02:09:03 2008
@@ -24,7 +24,6 @@
 
 public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
 {
-    private AMQDataBlock _firstFrame;
 
     private AMQDataBlock[] _blocks;
 
@@ -33,27 +32,12 @@
         _blocks = blocks;
     }
 
-    /**
-     * The encoded block will be logically first before the AMQDataBlocks which are encoded
-     * into the buffer afterwards.
-     * @param encodedBlock already-encoded data
-     * @param blocks some blocks to be encoded.
-     */
-    public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
-    {
-        this(blocks);
-        _firstFrame = encodedBlock;
-    }
 
     public AMQDataBlock[] getBlocks()
     {
         return _blocks;
     }
 
-    public AMQDataBlock getFirstFrame()
-    {
-        return _firstFrame;
-    }
 
     public long getSize()
     {
@@ -62,19 +46,11 @@
         {
             frameSize += _blocks[i].getSize();
         }
-        if (_firstFrame != null)
-        {
-            frameSize += _firstFrame.getSize();
-        }
         return frameSize;
     }
 
     public void writePayload(ByteBuffer buffer)
     {
-        if (_firstFrame != null)
-        {
-            _firstFrame.writePayload(buffer);
-        }
         for (int i = 0; i < _blocks.length; i++)
         {
             _blocks[i].writePayload(buffer);
@@ -90,7 +66,7 @@
         else
         {
             StringBuilder buf = new StringBuilder(this.getClass().getName());
-            buf.append("{encodedBlock=").append(_firstFrame);
+            buf.append("{");
             for (int i = 0 ; i < _blocks.length; i++)
             {
                 buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public class ContentBody implements AMQBody
 {
@@ -66,6 +68,12 @@
             ByteBuffer copy = payload.duplicate();
             buffer.put(copy.rewind());
         }
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+            throws AMQException
+    {
+        session.contentBodyReceived(channelId, this);
     }
 
     protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public class ContentHeaderBody implements AMQBody
 {
@@ -108,6 +110,12 @@
         buffer.putLong(bodySize);
         EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
         properties.writePropertyListPayload(buffer);
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+            throws AMQException
+    {
+        session.contentHeaderReceived(channelId, this);
     }
 
     public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public class HeartbeatBody implements AMQBody
 {
@@ -53,6 +55,12 @@
 
     public void writePayload(ByteBuffer buffer)
     {
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+            throws AMQException
+    {
+        session.heartbeatBodyReceived(channelId, this);
     }
 
     protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Feb 21 02:09:03 2008
@@ -70,7 +70,7 @@
         return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
                                           publishBody.getImmediate(),
                                           publishBody.getMandatory(),
-                                          routingKey == null ? null : routingKey.intern());
+                                          routingKey);
 
     }
 

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Feb 21 02:09:03 2008
@@ -94,21 +94,23 @@
     /**
      * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
      */
-    void processAll()
+    boolean processAll()
     {
         // limit the number of events processed in one run
-        for (int i = 0; i < _maxEvents; i++)
+        int i = _maxEvents;
+        while( --i != 0 )
         {
             Event e = _eventQueue.poll();
             if (e == null)
             {
-                break;
+                return true;
             }
             else
             {
                 e.process(_session);
             }
         }
+        return false;
     }
 
     /**
@@ -144,9 +146,15 @@
      */
     public void run()
     {
-        processAll();
-        deactivate();
-        _completionHandler.completed(_session, this);
+        if(processAll())
+        {
+            deactivate();
+            _completionHandler.completed(_session, this);
+        }
+        else
+        {
+            _completionHandler.notCompleted(_session, this);
+        }
     }
 
     /**
@@ -158,5 +166,7 @@
     static interface JobCompletionHandler
     {
         public void completed(IoSession session, Job job);
+
+        public void notCompleted(final IoSession session, final Job job);
     }
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu Feb 21 02:09:03 2008
@@ -29,8 +29,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
 
 /**
  * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
@@ -84,9 +84,6 @@
     /** Used for debugging purposes. */
     private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class);
 
-    /** Holds a mapping from Mina sessions to batched jobs for execution. */
-    private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
-
     /** Holds the managed reference to obtain the executor for the batched jobs. */
     private final ReferenceCountingExecutorService _poolReference;
 
@@ -94,7 +91,9 @@
     private final String _name;
 
     /** Defines the maximum number of events that will be batched into a single job. */
-    private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+    static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
+    private final int _maxEvents;
 
     /**
      * Creates a named pooling filter, on the specified shared thread pool.
@@ -102,10 +101,11 @@
      * @param refCountingPool The thread pool reference.
      * @param name            The identifying name of the filter type.
      */
-    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
     {
         _poolReference = refCountingPool;
         _name = name;
+        _maxEvents = maxEvents;
     }
 
     /**
@@ -160,20 +160,34 @@
     /**
      * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
      *
-     * @param session The Mina session to work in.
+     * @param job The job.
      * @param event   The event to hand off asynchronously.
      */
-    void fireAsynchEvent(IoSession session, Event event)
+    void fireAsynchEvent(Job job, Event event)
     {
-        Job job = getJobForSession(session);
+
         // job.acquire(); //prevents this job being removed from _jobs
         job.add(event);
 
-        // Additional checks on pool to check that it hasn't shutdown.
-        // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
-        if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+        final ExecutorService pool = _poolReference.getPool();
+
+        if(pool == null)
         {
-            _poolReference.getPool().execute(job);
+            return;
+        }
+
+        // rather than perform additional checks on pool to check that it hasn't shutdown.
+        // catch the RejectedExecutionException that will result from executing on a shutdown pool
+        if (job.activate())
+        {
+            try
+            {
+                pool.execute(job);
+            }
+            catch(RejectedExecutionException e)
+            {
+                _logger.warn("Thread pool shutdown while tasks still outstanding");
+            }
         }
 
     }
@@ -186,7 +200,7 @@
      */
     public void createNewJobForSession(IoSession session)
     {
-        Job job = new Job(session, this, _maxEvents);
+        Job job = new Job(session, this, MAX_JOB_EVENTS);
         session.setAttribute(_name, job);
     }
 
@@ -197,7 +211,7 @@
      *
      * @return The Job for this filter to place asynchronous events into.
      */
-    private Job getJobForSession(IoSession session)
+    public Job getJobForSession(IoSession session)
     {
         return (Job) session.getAttribute(_name);
     }
@@ -233,17 +247,57 @@
         // }
         // }
         // else
+
+
         if (!job.isComplete())
         {
+            final ExecutorService pool = _poolReference.getPool();
+
+            if(pool == null)
+            {
+                return;
+            }
+
+
             // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
             // Can the pool be shutdown at this point?
-            if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+            if (job.activate())
             {
-                _poolReference.getPool().execute(job);
+                try
+                {
+                    pool.execute(job);
+                }
+                catch(RejectedExecutionException e)
+                {
+                    _logger.warn("Thread pool shutdown while tasks still outstanding");
+                }
+
             }
         }
     }
 
+    public void notCompleted(IoSession session, Job job)
+    {
+        final ExecutorService pool = _poolReference.getPool();
+
+        if(pool == null)
+        {
+            return;
+        }
+
+        try
+        {
+            pool.execute(job);
+        }
+        catch(RejectedExecutionException e)
+        {
+            _logger.warn("Thread pool shutdown while tasks still outstanding");
+        }
+
+    }
+
+
+
     /**
      * No-op pass through filter to the next filter in the chain.
      *
@@ -400,7 +454,7 @@
          */
         public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name);
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
         }
 
         /**
@@ -412,8 +466,8 @@
          */
         public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
         {
-
-            fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+            Job job = getJobForSession(session);
+            fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
         }
 
         /**
@@ -424,7 +478,8 @@
          */
         public void sessionClosed(final NextFilter nextFilter, final IoSession session)
         {
-            fireAsynchEvent(session, new CloseEvent(nextFilter));
+            Job job = getJobForSession(session);
+            fireAsynchEvent(job, new CloseEvent(nextFilter));
         }
     }
 
@@ -442,7 +497,7 @@
          */
         public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name);
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
         }
 
         /**
@@ -454,7 +509,8 @@
          */
         public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
         {
-            fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+            Job job = getJobForSession(session);
+            fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
         }
 
         /**
@@ -465,7 +521,8 @@
          */
         public void sessionClosed(final NextFilter nextFilter, final IoSession session)
         {
-            fireAsynchEvent(session, new CloseEvent(nextFilter));
+            Job job = getJobForSession(session);
+            fireAsynchEvent(job, new CloseEvent(nextFilter));
         }
     }
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.protocol;
 
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
 
 /**
  * AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in
@@ -57,7 +58,7 @@
      *
      * @todo Consider narrowing the exception.
      */
-    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
+    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException;
 
     /**
      * Notifies the listener of an error on the event context to which it is listening. The listener should perform

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Thu Feb 21 02:09:03 2008
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.protocol;
 
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
 
 /**
  * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
@@ -46,4 +46,12 @@
 //    public VersionSpecificRegistry getRegistry();
 
     MethodRegistry getMethodRegistry();
+
+
+    public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException;
+    public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException;
+    public void contentBodyReceived(int channelId, ContentBody body) throws AMQException;
+    public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+
+
 }

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Feb 21 02:09:03 2008
@@ -24,20 +24,15 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.mina.common.IoSession;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.management.JMException;
 
@@ -62,7 +57,7 @@
         AMQQueue queue =
             new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false,
                 new AMQShortString("test"), true, _protocolSession.getVirtualHost());
-        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore, null);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
         channel.setDefaultQueue(queue);
         _protocolSession.addChannel(channel);
         channelCount = _mbean.channels().size();
@@ -73,7 +68,7 @@
         assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
 
         // check APIs
-        AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore, null);
+        AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore);
         channel3.setLocalTransactional();
         _protocolSession.addChannel(channel3);
         _mbean.rollbackTransactions(2);
@@ -93,14 +88,14 @@
         }
 
         // check if closing of session works
-        _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore, null));
+        _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore));
         _mbean.closeConnection();
         try
         {
             channelCount = _mbean.channels().size();
             assertTrue(channelCount == 0);
             // session is now closed so adding another channel should throw an exception
-            _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore, null));
+            _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore));
             fail();
         }
         catch (AMQException ex)
@@ -119,7 +114,7 @@
             new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
                 null);
         _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
-        _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
+        _channel = new AMQChannel(_protocolSession, 1, _messageStore);
         _protocolSession.addChannel(_channel);
         _mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject();
     }

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Thu Feb 21 02:09:03 2008
@@ -21,22 +21,12 @@
 package org.apache.qpid.server.protocol;
 
 import junit.framework.TestCase;
-import org.apache.mina.common.IoSession;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.management.JMException;
 
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class MaxChannelsTest extends TestCase
@@ -65,7 +55,7 @@
         {
             for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
             {
-                _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null));
+                _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null));
             }
         }
         catch (AMQException e)

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Thu Feb 21 02:09:03 2008
@@ -24,7 +24,6 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -37,7 +36,6 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 
 import java.util.LinkedList;
@@ -77,7 +75,7 @@
         super.setUp();
         _messageStore = new TestableMemoryMessageStore();
         _protocolSession = new MockProtocolSession(_messageStore);
-        _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/);
+        _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
 
         _protocolSession.addChannel(_channel);
         _subscriptionManager = new SubscriptionSet();

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Thu Feb 21 02:09:03 2008
@@ -190,6 +190,26 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void methodFrameReceived(int channelId, AMQMethodBody body)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void contentHeaderReceived(int channelId, ContentHeaderBody body)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void contentBodyReceived(int channelId, ContentBody body)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public MethodDispatcher getMethodDispatcher()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.