You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/02/21 11:09:09 UTC
svn commit: r629731 [3/3] - in /incubator/qpid/branches/M2.1/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/q...
Added: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java?rev=629731&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java (added)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java Thu Feb 21 02:09:03 2008
@@ -0,0 +1,440 @@
+package org.apache.mina.filter.codec;
+
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultWriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.util.SessionLog;
+import org.apache.mina.util.Queue;
+
+
+public class QpidProtocolCodecFilter extends IoFilterAdapter
+{
+ public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
+
+ private static final Class[] EMPTY_PARAMS = new Class[0];
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
+
+ private final ProtocolCodecFactory factory;
+
+ public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+ {
+ if( encoder == null )
+ {
+ throw new NullPointerException( "encoder" );
+ }
+ if( decoder == null )
+ {
+ throw new NullPointerException( "decoder" );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder()
+ {
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return decoder;
+ }
+ };
+ }
+
+ public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+ {
+ if( encoderClass == null )
+ {
+ throw new NullPointerException( "encoderClass" );
+ }
+ if( decoderClass == null )
+ {
+ throw new NullPointerException( "decoderClass" );
+ }
+ if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+ {
+ throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+ }
+ if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+ {
+ throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+ }
+ try
+ {
+ encoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+ }
+ try
+ {
+ decoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder() throws Exception
+ {
+ return ( ProtocolEncoder ) encoderClass.newInstance();
+ }
+
+ public ProtocolDecoder getDecoder() throws Exception
+ {
+ return ( ProtocolDecoder ) decoderClass.newInstance();
+ }
+ };
+ }
+
+ public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+ }
+ }
+
+ public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( !( message instanceof ByteBuffer ) )
+ {
+ nextFilter.messageReceived( session, message );
+ return;
+ }
+
+ ByteBuffer in = ( ByteBuffer ) message;
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+
+ try
+ {
+ decoder.decode( session, in, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.release();
+
+ decoderOut.flush();
+ }
+ }
+
+ public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( message instanceof HiddenByteBuffer )
+ {
+ return;
+ }
+
+ if( !( message instanceof MessageByteBuffer ) )
+ {
+ nextFilter.messageSent( session, message );
+ return;
+ }
+
+ nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
+ }
+
+ public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
+ {
+ Object message = writeRequest.getMessage();
+ if( message instanceof ByteBuffer )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ return;
+ }
+
+ ProtocolEncoder encoder = getEncoder( session );
+ ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
+
+ try
+ {
+ encoder.encode( session, message, encoderOut );
+ encoderOut.flush();
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ new MessageByteBuffer( writeRequest.getMessage() ),
+ writeRequest.getFuture(), writeRequest.getDestination() ) );
+ }
+ catch( Throwable t )
+ {
+ ProtocolEncoderException pee;
+ if( t instanceof ProtocolEncoderException )
+ {
+ pee = ( ProtocolEncoderException ) t;
+ }
+ else
+ {
+ pee = new ProtocolEncoderException( t );
+ }
+ throw pee;
+ }
+ finally
+ {
+ // Dispose the encoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeEncoder( session );
+ }
+ }
+ }
+
+ public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ // Call finishDecode() first when a connection is closed.
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+ try
+ {
+ decoder.finishDecode( session, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ throw pde;
+ }
+ finally
+ {
+ // Dispose all.
+ disposeEncoder( session );
+ disposeDecoder( session );
+
+ decoderOut.flush();
+ }
+
+ nextFilter.sessionClosed( session );
+ }
+
+ private ProtocolEncoder getEncoder( IoSession session ) throws Exception
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder();
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
+ }
+
+ private ProtocolDecoder getDecoder( IoSession session ) throws Exception
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder();
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+ private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
+ {
+ return new SimpleProtocolDecoderOutput( session, nextFilter );
+ }
+
+ private void disposeEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+ private static class HiddenByteBuffer extends ByteBufferProxy
+ {
+ private HiddenByteBuffer( ByteBuffer buf )
+ {
+ super( buf );
+ }
+ }
+
+ private static class MessageByteBuffer extends ByteBufferProxy
+ {
+ private final Object message;
+
+ private MessageByteBuffer( Object message )
+ {
+ super( EMPTY_BUFFER );
+ this.message = message;
+ }
+
+ public void acquire()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+
+ public void release()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
+ {
+ private ByteBuffer buffer;
+
+ private final IoSession session;
+ private final IoFilter.NextFilter nextFilter;
+ private final IoFilter.WriteRequest writeRequest;
+
+ public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ this.session = session;
+ this.nextFilter = nextFilter;
+ this.writeRequest = writeRequest;
+ }
+
+
+
+ public void write( ByteBuffer buf )
+ {
+ if(buffer != null)
+ {
+ flush();
+ }
+ buffer = buf;
+ }
+
+ public void mergeAll()
+ {
+ }
+
+ public WriteFuture flush()
+ {
+ WriteFuture future = null;
+ if( buffer == null )
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer buf = buffer;
+ // Flush only when the buffer has remaining.
+ if( buf.hasRemaining() )
+ {
+ future = doFlush( buf );
+ }
+
+ }
+
+ return future;
+ }
+
+
+ protected WriteFuture doFlush( ByteBuffer buf )
+ {
+ WriteFuture future = new DefaultWriteFuture( session );
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ buf,
+ future, writeRequest.getDestination() ) );
+ return future;
+ }
+ }
+}
+
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Feb 21 02:09:03 2008
@@ -22,6 +22,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
@@ -48,6 +49,9 @@
*/
public class AMQDecoder extends CumulativeProtocolDecoder
{
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -171,4 +175,97 @@
{
_expectProtocolInitiation = expectProtocolInitiation;
}
+
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( buf != null )
+ {
+ buf.put( in );
+ buf.flip();
+ }
+ else
+ {
+ buf = in;
+ }
+
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
+ {
+ if( buf.position() == oldPos )
+ {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
+ }
+
+ if( !buf.hasRemaining() )
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if ( buf.hasRemaining() )
+ {
+ storeRemainingInSession( buf, session );
+ }
+ else
+ {
+ removeSessionBuffer( session );
+ }
+ }
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ removeSessionBuffer( session );
+ }
+
+ private void removeSessionBuffer(IoSession session)
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
+ {
+ buf.release();
+ session.removeAttribute( BUFFER );
+ }
+ }
+
+ private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+ {
+ ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+ remainingBuf.setAutoExpand( true );
+ remainingBuf.put( buf );
+ session.setAttribute( BUFFER, remainingBuf );
+ }
+
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public interface AMQBody
{
@@ -36,4 +38,6 @@
//public void populateFromBuffer(ByteBuffer buffer, long size)
// throws AMQFrameDecodingException, AMQProtocolVersionException;
+
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Thu Feb 21 02:09:03 2008
@@ -27,7 +27,7 @@
private final int _channel;
private final AMQBody _bodyFrame;
-
+ public static final byte FRAME_END_BYTE = (byte) 0xCE;
public AMQFrame(final int channel, final AMQBody bodyFrame)
@@ -47,13 +47,19 @@
return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
+ public static final int getFrameOverhead()
+ {
+ return 1 + 2 + 4 + 1;
+ }
+
+
public void writePayload(ByteBuffer buffer)
{
buffer.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.put((byte) 0xCE);
+ buffer.put(FRAME_END_BYTE);
}
public final int getChannel()
@@ -66,10 +72,54 @@
return _bodyFrame;
}
-
-
public String toString()
{
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
+
+ public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ {
+ buffer.put(body.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+ body.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body3.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+ body3.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Thu Feb 21 02:09:03 2008
@@ -24,7 +24,9 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -84,6 +86,11 @@
public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
{
return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
}
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Feb 21 02:09:03 2008
@@ -26,8 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
import java.lang.ref.WeakReference;
/**
@@ -38,6 +37,62 @@
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+ private static final byte MINUS = (byte)'-';
+ private static final byte ZERO = (byte) '0';
+
+
+
+ private final class TokenizerImpl implements AMQShortStringTokenizer
+ {
+ private final byte _delim;
+ private int _count = -1;
+ private int _pos = 0;
+
+ public TokenizerImpl(final byte delim)
+ {
+ _delim = delim;
+ }
+
+ public int countTokens()
+ {
+ if(_count == -1)
+ {
+ _count = 1 + AMQShortString.this.occurences(_delim);
+ }
+ return _count;
+ }
+
+ public AMQShortString nextToken()
+ {
+ if(_pos <= AMQShortString.this.length())
+ {
+ int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+ if(nextDelim == -1)
+ {
+ nextDelim = AMQShortString.this.length();
+ }
+
+ AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+ _pos = nextDelim;
+ return nextToken;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean hasMoreTokens()
+ {
+ return _pos <= AMQShortString.this.length();
+ }
+ }
+
+ private AMQShortString substring(final int from, final int to)
+ {
+ return new AMQShortString(_data, from, to);
+ }
+
private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
- private final ByteBuffer _data;
+ private final byte[] _data;
+ private final int _offset;
private int _hashCode;
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -63,17 +119,25 @@
public AMQShortString(byte[] data)
{
- _data = ByteBuffer.wrap(data);
+ _data = data.clone();
_length = data.length;
+ _offset = 0;
+ }
+
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
}
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
- if (data != null)
- {
- _hashCode = data.hashCode();
- }
+
}
public AMQShortString(char[] data)
@@ -85,14 +149,17 @@
final int length = data.length;
final byte[] stringBytes = new byte[length];
+ int hash = 0;
for (int i = 0; i < length; i++)
{
stringBytes[i] = (byte) (0xFF & data[i]);
+ hash = (31 * hash) + stringBytes[i];
}
+ _hashCode = hash;
+ _data = stringBytes;
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
_length = length;
+ _offset = 0;
}
@@ -108,20 +175,31 @@
}
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
+ _data = stringBytes;
_hashCode = hash;
_length = length;
+ _offset = 0;
}
- private AMQShortString(ByteBuffer data)
+ private AMQShortString(ByteBuffer data, final int length)
{
- _data = data;
- _length = data.limit();
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _length = length;
+ _offset = 0;
+
+ }
+ private AMQShortString(final byte[] data, final int from, final int to)
+ {
+ _offset = from;
+ _length = to - from;
+ _data = data;
}
+
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -134,7 +212,7 @@
public char charAt(int index)
{
- return (char) _data.get(index);
+ return (char) _data[_offset + index];
}
@@ -146,27 +224,24 @@
public int writeToByteArray(byte[] encoding, int pos)
{
final int size = length();
- encoding[pos++] = (byte) length();
- for (int i = 0; i < size; i++)
- {
- encoding[pos++] = _data.get(i);
- }
-
- return pos;
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
}
public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
{
- final byte len = byteEncodedDestination[pos];
- if (len == 0)
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
{
return null;
}
-
- ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
- return new AMQShortString(data);
+ else
+ {
+ return shortString;
+ }
}
public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -178,90 +253,59 @@
}
else
{
- ByteBuffer data = buffer.slice();
- data.limit(length);
- data.rewind();
- buffer.skip(length);
- return new AMQShortString(data);
+ return new AMQShortString(buffer, length);
}
}
public byte[] getBytes()
{
-
- if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+ if(_offset == 0 && _length == _data.length)
{
- return _data.array();
+ return _data.clone();
}
else
{
- final int size = length();
- byte[] b = new byte[size];
- ByteBuffer buf = _data.duplicate();
- buf.rewind();
- buf.get(b);
-
- return b;
+ byte[] data = new byte[_length];
+ System.arraycopy(_data,_offset,data,0,_length);
+ return data;
}
-
}
public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
- if (size != 0)
- {
-
- buffer.setAutoExpand(true);
- buffer.put((byte) size);
- if (_data.buf().hasArray())
- {
- buffer.put(_data.array(), _data.arrayOffset(), length());
- }
- else
- {
-
- for (int i = 0; i < size; i++)
- {
-
- buffer.put(_data.get(i));
- }
- }
- }
- else
- {
- // really writing out unsigned byte
- buffer.put((byte) 0);
- }
+ //buffer.setAutoExpand(true);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
}
private final class CharSubSequence implements CharSequence
{
- private final int _offset;
+ private final int _sequenceOffset;
private final int _end;
public CharSubSequence(final int offset, final int end)
{
- _offset = offset;
+ _sequenceOffset = offset;
_end = end;
}
public int length()
{
- return _end - _offset;
+ return _end - _sequenceOffset;
}
public char charAt(int index)
{
- return AMQShortString.this.charAt(index + _offset);
+ return AMQShortString.this.charAt(index + _sequenceOffset);
}
public CharSequence subSequence(int start, int end)
{
- return new CharSubSequence(start + _offset, end + _offset);
+ return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
}
}
@@ -272,7 +316,7 @@
for (int i = 0; i < size; i++)
{
- chars[i] = (char) _data.get(i);
+ chars[i] = (char) _data[i + _offset];
}
return chars;
@@ -285,6 +329,17 @@
public boolean equals(Object o)
{
+
+
+ if(o instanceof AMQShortString)
+ {
+ return equals((AMQShortString)o);
+ }
+ if(o instanceof CharSequence)
+ {
+ return equals((CharSequence)o);
+ }
+
if (o == null)
{
return false;
@@ -295,26 +350,40 @@
return true;
}
- if (o instanceof AMQShortString)
- {
- final AMQShortString otherString = (AMQShortString) o;
+ return false;
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
- {
- return false;
- }
+ }
+
+ public boolean equals(final AMQShortString otherString)
+ {
+ if (otherString == this)
+ {
+ return true;
+ }
- return _data.equals(otherString._data);
+ if (otherString == null)
+ {
+ return false;
+ }
+ if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
}
- return (o instanceof CharSequence) && equals((CharSequence) o);
+ return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+ || Arrays.equals(getBytes(),otherString.getBytes());
}
public boolean equals(CharSequence s)
{
+ if(s instanceof AMQShortString)
+ {
+ return equals((AMQShortString)s);
+ }
+
if (s == null)
{
return false;
@@ -345,7 +414,7 @@
for (int i = 0; i < size; i++)
{
- hash = (31 * hash) + _data.get(i);
+ hash = (31 * hash) + _data[i+_offset];
}
_hashCode = hash;
@@ -380,8 +449,8 @@
for (int i = 0; i < length(); i++)
{
- final byte d = _data.get(i);
- final byte n = name._data.get(i);
+ final byte d = _data[i+_offset];
+ final byte n = name._data[i+name._offset];
if (d < n)
{
return -1;
@@ -398,6 +467,12 @@
}
+ public AMQShortStringTokenizer tokenize(byte delim)
+ {
+ return new TokenizerImpl(delim);
+ }
+
+
public AMQShortString intern()
{
@@ -435,4 +510,111 @@
return internString;
}
+
+ private int occurences(final byte delim)
+ {
+ int count = 0;
+ final int end = _offset + _length;
+ for(int i = _offset ; i < end ; i++ )
+ {
+ if(_data[i] == delim)
+ {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int indexOf(final byte val, final int pos)
+ {
+
+ for(int i = pos; i < length(); i++)
+ {
+ if(_data[_offset+i] == val)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ public static AMQShortString join(final Collection<AMQShortString> terms,
+ final AMQShortString delim)
+ {
+ if(terms.size() == 0)
+ {
+ return EMPTY_STRING;
+ }
+
+ int size = delim.length() * (terms.size() - 1);
+ for(AMQShortString term : terms)
+ {
+ size += term.length();
+ }
+
+ byte[] data = new byte[size];
+ int pos = 0;
+ final byte[] delimData = delim._data;
+ final int delimOffset = delim._offset;
+ final int delimLength = delim._length;
+
+
+ for(AMQShortString term : terms)
+ {
+
+ if(pos!=0)
+ {
+ System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+ pos+=delimLength;
+ }
+ System.arraycopy(term._data,term._offset,data,pos,term._length);
+ pos+=term._length;
+ }
+
+
+
+ return new AMQShortString(data,0,size);
+ }
+
+ public int toIntValue()
+ {
+ int pos = 0;
+ int val = 0;
+
+
+ boolean isNegative = (_data[pos] == MINUS);
+ if(isNegative)
+ {
+ pos++;
+ }
+ while(pos < _length)
+ {
+ int digit = (int) (_data[pos++] - ZERO);
+ if((digit < 0) || (digit > 9))
+ {
+ throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+ }
+ val = val * 10;
+ val += digit;
+ }
+ if(isNegative)
+ {
+ val = val * -1;
+ }
+ return val;
+ }
+
+ public boolean contains(final byte b)
+ {
+ for(int i = 0; i < _length; i++)
+ {
+ if(_data[i] == b)
+ {
+ return true;
+ }
+ }
+ return false; //To change body of created methods use File | Settings | File Templates.
+ }
+
}
Added: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java?rev=629731&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java (added)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java Thu Feb 21 02:09:03 2008
@@ -0,0 +1,31 @@
+package org.apache.qpid.framing;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface AMQShortStringTokenizer
+{
+
+ public int countTokens();
+
+ public AMQShortString nextToken();
+
+ boolean hasMoreTokens();
+}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Thu Feb 21 02:09:03 2008
@@ -24,7 +24,6 @@
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private AMQDataBlock _firstFrame;
private AMQDataBlock[] _blocks;
@@ -33,27 +32,12 @@
_blocks = blocks;
}
- /**
- * The encoded block will be logically first before the AMQDataBlocks which are encoded
- * into the buffer afterwards.
- * @param encodedBlock already-encoded data
- * @param blocks some blocks to be encoded.
- */
- public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
- {
- this(blocks);
- _firstFrame = encodedBlock;
- }
public AMQDataBlock[] getBlocks()
{
return _blocks;
}
- public AMQDataBlock getFirstFrame()
- {
- return _firstFrame;
- }
public long getSize()
{
@@ -62,19 +46,11 @@
{
frameSize += _blocks[i].getSize();
}
- if (_firstFrame != null)
- {
- frameSize += _firstFrame.getSize();
- }
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_firstFrame != null)
- {
- _firstFrame.writePayload(buffer);
- }
for (int i = 0; i < _blocks.length; i++)
{
_blocks[i].writePayload(buffer);
@@ -90,7 +66,7 @@
else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_firstFrame);
+ buf.append("{");
for (int i = 0 ; i < _blocks.length; i++)
{
buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentBody implements AMQBody
{
@@ -66,6 +68,12 @@
ByteBuffer copy = payload.duplicate();
buffer.put(copy.rewind());
}
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentBodyReceived(channelId, this);
}
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentHeaderBody implements AMQBody
{
@@ -108,6 +110,12 @@
buffer.putLong(bodySize);
EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
properties.writePropertyListPayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentHeaderReceived(channelId, this);
}
public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class HeartbeatBody implements AMQBody
{
@@ -53,6 +55,12 @@
public void writePayload(ByteBuffer buffer)
{
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.heartbeatBodyReceived(channelId, this);
}
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Feb 21 02:09:03 2008
@@ -70,7 +70,7 @@
return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
publishBody.getImmediate(),
publishBody.getMandatory(),
- routingKey == null ? null : routingKey.intern());
+ routingKey);
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Feb 21 02:09:03 2008
@@ -94,21 +94,23 @@
/**
* Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
*/
- void processAll()
+ boolean processAll()
{
// limit the number of events processed in one run
- for (int i = 0; i < _maxEvents; i++)
+ int i = _maxEvents;
+ while( --i != 0 )
{
Event e = _eventQueue.poll();
if (e == null)
{
- break;
+ return true;
}
else
{
e.process(_session);
}
}
+ return false;
}
/**
@@ -144,9 +146,15 @@
*/
public void run()
{
- processAll();
- deactivate();
- _completionHandler.completed(_session, this);
+ if(processAll())
+ {
+ deactivate();
+ _completionHandler.completed(_session, this);
+ }
+ else
+ {
+ _completionHandler.notCompleted(_session, this);
+ }
}
/**
@@ -158,5 +166,7 @@
static interface JobCompletionHandler
{
public void completed(IoSession session, Job job);
+
+ public void notCompleted(final IoSession session, final Job job);
}
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu Feb 21 02:09:03 2008
@@ -29,8 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
/**
* PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
@@ -84,9 +84,6 @@
/** Used for debugging purposes. */
private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class);
- /** Holds a mapping from Mina sessions to batched jobs for execution. */
- private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
-
/** Holds the managed reference to obtain the executor for the batched jobs. */
private final ReferenceCountingExecutorService _poolReference;
@@ -94,7 +91,9 @@
private final String _name;
/** Defines the maximum number of events that will be batched into a single job. */
- private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+ static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
+ private final int _maxEvents;
/**
* Creates a named pooling filter, on the specified shared thread pool.
@@ -102,10 +101,11 @@
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
{
_poolReference = refCountingPool;
_name = name;
+ _maxEvents = maxEvents;
}
/**
@@ -160,20 +160,34 @@
/**
* Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
*
- * @param session The Mina session to work in.
+ * @param job The job.
* @param event The event to hand off asynchronously.
*/
- void fireAsynchEvent(IoSession session, Event event)
+ void fireAsynchEvent(Job job, Event event)
{
- Job job = getJobForSession(session);
+
// job.acquire(); //prevents this job being removed from _jobs
job.add(event);
- // Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
{
- _poolReference.getPool().execute(job);
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
}
}
@@ -186,7 +200,7 @@
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, _maxEvents);
+ Job job = new Job(session, this, MAX_JOB_EVENTS);
session.setAttribute(_name, job);
}
@@ -197,7 +211,7 @@
*
* @return The Job for this filter to place asynchronous events into.
*/
- private Job getJobForSession(IoSession session)
+ public Job getJobForSession(IoSession session)
{
return (Job) session.getAttribute(_name);
}
@@ -233,17 +247,57 @@
// }
// }
// else
+
+
if (!job.isComplete())
{
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
- if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+ if (job.activate())
{
- _poolReference.getPool().execute(job);
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
}
}
}
+ public void notCompleted(IoSession session, Job job)
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+
+
+
/**
* No-op pass through filter to the next filter in the chain.
*
@@ -400,7 +454,7 @@
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name);
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
}
/**
@@ -412,8 +466,8 @@
*/
public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
{
-
- fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
}
/**
@@ -424,7 +478,8 @@
*/
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
- fireAsynchEvent(session, new CloseEvent(nextFilter));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new CloseEvent(nextFilter));
}
}
@@ -442,7 +497,7 @@
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name);
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
}
/**
@@ -454,7 +509,8 @@
*/
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
{
- fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
}
/**
@@ -465,7 +521,8 @@
*/
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
- fireAsynchEvent(session, new CloseEvent(nextFilter));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new CloseEvent(nextFilter));
}
}
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java Thu Feb 21 02:09:03 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
/**
* AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in
@@ -57,7 +58,7 @@
*
* @todo Consider narrowing the exception.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException;
/**
* Notifies the listener of an error on the event context to which it is listening. The listener should perform
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Thu Feb 21 02:09:03 2008
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
@@ -46,4 +46,12 @@
// public VersionSpecificRegistry getRegistry();
MethodRegistry getMethodRegistry();
+
+
+ public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException;
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException;
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException;
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+
+
}
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Feb 21 02:09:03 2008
@@ -24,20 +24,15 @@
import org.apache.log4j.Logger;
-import org.apache.mina.common.IoSession;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
@@ -62,7 +57,7 @@
AMQQueue queue =
new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false,
new AMQShortString("test"), true, _protocolSession.getVirtualHost());
- AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -73,7 +68,7 @@
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore, null);
+ AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore);
channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
@@ -93,14 +88,14 @@
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore));
_mbean.closeConnection();
try
{
channelCount = _mbean.channels().size();
assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore));
fail();
}
catch (AMQException ex)
@@ -119,7 +114,7 @@
new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
null);
_protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
- _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
+ _channel = new AMQChannel(_protocolSession, 1, _messageStore);
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject();
}
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Thu Feb 21 02:09:03 2008
@@ -21,22 +21,12 @@
package org.apache.qpid.server.protocol;
import junit.framework.TestCase;
-import org.apache.mina.common.IoSession;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.management.JMException;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
@@ -65,7 +55,7 @@
{
for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
{
- _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null));
}
}
catch (AMQException e)
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Thu Feb 21 02:09:03 2008
@@ -24,7 +24,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -37,7 +36,6 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.LinkedList;
@@ -77,7 +75,7 @@
super.setUp();
_messageStore = new TestableMemoryMessageStore();
_protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/);
+ _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
_subscriptionManager = new SubscriptionSet();
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=629731&r1=629730&r2=629731&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Thu Feb 21 02:09:03 2008
@@ -190,6 +190,26 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void methodFrameReceived(int channelId, AMQMethodBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void contentBodyReceived(int channelId, ContentBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public MethodDispatcher getMethodDispatcher()
{
return null; //To change body of implemented methods use File | Settings | File Templates.