You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/01 18:27:55 UTC

svn commit: r810110 [2/2] - in /qpid/branches/java-network-refactor/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/connection/ broker/src/ma...

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Tue Sep  1 16:27:52 2009
@@ -20,14 +20,21 @@
  */
 package org.apache.qpid.codec;
 
+import java.util.ArrayList;
+
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 
+import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
  * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -62,14 +69,19 @@
     private boolean _expectProtocolInitiation;
     private boolean firstDecode = true;
 
+    private AMQMethodBodyFactory _bodyFactory;
+
+    private ByteBuffer _remainingBuf;
+    
     /**
      * Creates a new AMQP decoder.
      *
      * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
      */
-    public AMQDecoder(boolean expectProtocolInitiation)
+    public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
     {
         _expectProtocolInitiation = expectProtocolInitiation;
+        _bodyFactory = new AMQMethodBodyFactory(session);
     }
 
     /**
@@ -120,7 +132,7 @@
     protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
         int pos = in.position();
-        boolean enoughData = _dataBlockDecoder.decodable(session, in);
+        boolean enoughData = _dataBlockDecoder.decodable(in.buf());
         in.position(pos);
         if (!enoughData)
         {
@@ -149,7 +161,7 @@
      */
     private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
-        boolean enoughData = _piDecoder.decodable(session, in);
+        boolean enoughData = _piDecoder.decodable(in.buf());
         if (!enoughData)
         {
             // returning false means it will leave the contents in the buffer and
@@ -158,7 +170,8 @@
         }
         else
         {
-            _piDecoder.decode(session, in, out);
+            ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+            out.write(pi);
 
             return true;
         }
@@ -177,7 +190,7 @@
     }
 
 
- /**
+    /**
      * Cumulates content of <tt>in</tt> into internal buffer and forwards
      * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
      * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
@@ -268,4 +281,60 @@
         session.setAttribute( BUFFER, remainingBuf );
     }
 
+    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+    {
+
+        // get prior remaining data from accumulator
+        ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+        ByteBuffer msg;
+        // if we have a session buffer, append data to that otherwise
+        // use the buffer read from the network directly
+        if( _remainingBuf != null )
+        {
+            _remainingBuf.put(buf);
+            _remainingBuf.flip();
+            msg = _remainingBuf;
+        }
+        else
+        {
+            msg = ByteBuffer.wrap(buf);
+        }
+        
+        if (_expectProtocolInitiation  
+            || (firstDecode
+                && (msg.remaining() > 0)
+                && (msg.get(msg.position()) == (byte)'A')))
+        {
+            if (_piDecoder.decodable(msg.buf()))
+            {
+                dataBlocks.add(new ProtocolInitiation(msg.buf()));
+            }
+        }
+        else
+        {
+            boolean enoughData = true;
+            while (enoughData)
+            {
+                int pos = msg.position();
+
+                enoughData = _dataBlockDecoder.decodable(msg);
+                msg.position(pos);
+                if (enoughData)
+                {
+                    dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
+                }
+                else
+                {
+                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+                    _remainingBuf.setAutoExpand(true);
+                    _remainingBuf.put(msg);
+                }
+            }
+        }
+        if(firstDecode && dataBlocks.size() > 0)
+        {
+            firstDecode = false;
+        }
+        return dataBlocks;
+    }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Tue Sep  1 16:27:52 2009
@@ -47,7 +47,7 @@
     public AMQDataBlockDecoder()
     { }
 
-    public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
     {
         final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
         // type, channel, body length and end byte
@@ -56,14 +56,15 @@
             return false;
         }
 
-        in.skip(1 + 2);
-        final long bodySize = in.getUnsignedInt();
+        in.position(in.position() + 1 + 2);
+        // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() 
+        final long bodySize = in.getInt() & 0xffffffffL; 
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
         throws AMQFrameDecodingException, AMQProtocolVersionException
     {
         final byte type = in.get();
@@ -71,15 +72,7 @@
         BodyFactory bodyFactory;
         if (type == AMQMethodBody.TYPE)
         {
-            bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
-            if (bodyFactory == null)
-            {
-                AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
-                bodyFactory = new AMQMethodBodyFactory(protocolSession);
-                session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-
-            }
-
+            bodyFactory = methodBodyFactory;
         }
         else
         {
@@ -115,6 +108,24 @@
 
     public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
-        out.write(createAndPopulateFrame(session, in));
+        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+        if (bodyFactory == null)
+        {
+            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+            bodyFactory = new AMQMethodBodyFactory(protocolSession);
+            session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+        }
+        
+        out.write(createAndPopulateFrame(bodyFactory, in));
+    }
+
+    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+    {
+        return decodable(msg.buf());
+    }
+
+    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
     }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java Tue Sep  1 16:27:52 2009
@@ -50,7 +50,7 @@
         {
             _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
         }
-
+        
         out.write(buffer);
     }
 

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Tue Sep  1 16:27:52 2009
@@ -20,12 +20,10 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.apache.qpid.AMQException;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 
 public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -53,13 +51,12 @@
         _protocolMajor = protocolMajor;
         _protocolMinor = protocolMinor;
     }
-
+    
     public ProtocolInitiation(ProtocolVersion pv)
     {
         this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
     }
 
-
     public ProtocolInitiation(ByteBuffer in)
     {
         _protocolHeader = new byte[4];
@@ -71,6 +68,11 @@
         _protocolMinor = in.get();
     }
 
+    public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+    {
+        writePayload(buffer.buf());
+    }
+    
     public long getSize()
     {
         return 4 + 1 + 1 + 1 + 1;
@@ -127,16 +129,11 @@
          * @return true if we have enough data to decode the PI frame fully, false if more
          * data is required
          */
-        public boolean decodable(IoSession session, ByteBuffer in)
+        public boolean decodable(ByteBuffer in)
         {
             return (in.remaining() >= 8);
         }
 
-        public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
-        {
-            ProtocolInitiation pi = new ProtocolInitiation(in);
-            out.write(pi);
-        }
     }
 
     public ProtocolVersion checkVersion() throws AMQException
@@ -192,4 +189,5 @@
         buffer.append(Integer.toHexString(_protocolMinor));
         return buffer.toString();
     }
+
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java Tue Sep  1 16:27:52 2009
@@ -45,21 +45,31 @@
  *       a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract,
  *       it is really an interface, so could just drop it and use the continuation interface instead.
  */
-public abstract class Event
+public class Event
 {
+    private Runnable _runner;
+
+    public Event()
+    {
+        
+    }
+    
     /**
      * Creates a continuation.
      */
-    public Event()
-    { }
+    public Event(Runnable runner)
+    { 
+        _runner = runner;
+    }
 
     /**
-     * Processes the continuation in the context of a Mina session.
-     *
-     * @param session The Mina session.
+     * Processes the continuation
      */
-    public abstract void process(IoSession session);
-
+    public void process()
+    {
+        _runner.run();
+    }
+    
     /**
      * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter.
      *
@@ -68,22 +78,22 @@
      * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
      * </table>
      */
-    public static final class ReceivedEvent extends Event
+    public static final class MinaReceivedEvent extends Event
     {
         private final Object _data;
-
         private final IoFilter.NextFilter _nextFilter;
+        private final IoSession _session;
 
-        public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
+        public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session)
         {
-            super();
             _nextFilter = nextFilter;
             _data = data;
+            _session = session;
         }
 
-        public void process(IoSession session)
+        public void process()
         {
-            _nextFilter.messageReceived(session, _data);
+            _nextFilter.messageReceived(_session, _data);
         }
 
         public IoFilter.NextFilter getNextFilter()
@@ -101,21 +111,22 @@
      *     <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession}
      * </table>
      */
-    public static final class WriteEvent extends Event
+    public static final class MinaWriteEvent extends Event
     {
         private final IoFilter.WriteRequest _data;
         private final IoFilter.NextFilter _nextFilter;
+        private IoSession _session;
 
-        public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
+        public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session)
         {
-            super();
             _nextFilter = nextFilter;
             _data = data;
+            _session = session;
         }
 
-        public void process(IoSession session)
+        public void process()
         {
-            _nextFilter.filterWrite(session, _data);
+            _nextFilter.filterWrite(_session, _data);
         }
 
         public IoFilter.NextFilter getNextFilter()
@@ -135,16 +146,17 @@
     public static final class CloseEvent extends Event
     {
         private final IoFilter.NextFilter _nextFilter;
+        private final IoSession _session;
 
-        public CloseEvent(final IoFilter.NextFilter nextFilter)
+        public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session)
         {
-            super();
             _nextFilter = nextFilter;
+            _session = session;
         }
 
-        public void process(IoSession session)
+        public void process()
         {
-            _nextFilter.sessionClosed(session);
+            _nextFilter.sessionClosed(_session);
         }
 
         public IoFilter.NextFilter getNextFilter()

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Tue Sep  1 16:27:52 2009
@@ -55,9 +55,6 @@
     /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
     private final int _maxEvents;
 
-    /** The Mina session. */
-    private final IoSession _session;
-
     /** Holds the queue of events that make up the job. */
     private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
 
@@ -79,7 +76,13 @@
      */
     Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
     {
-        _session = session;
+        _completionHandler = completionHandler;
+        _maxEvents = maxEvents;
+        _readJob = readJob;
+    }
+
+    public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob)
+    {
         _completionHandler = completionHandler;
         _maxEvents = maxEvents;
         _readJob = readJob;
@@ -90,7 +93,7 @@
      *
      * @param evt The continuation to enqueue.
      */
-    void add(Event evt)
+    public void add(Event evt)
     {
         _eventQueue.add(evt);
     }
@@ -111,7 +114,7 @@
             }
             else
             {
-                e.process(_session);
+                e.process();
             }
         }
         return false;
@@ -153,30 +156,19 @@
         if(processAll())
         {
             deactivate();
-            _completionHandler.completed(_session, this);
+            _completionHandler.completed(this);
         }
         else
         {
-            _completionHandler.notCompleted(_session, this);
+            _completionHandler.notCompleted(this);
         }
     }
 
-    public boolean isReadJob()
-    {
-        return _readJob;
-    }
-
     public boolean isRead()
     {
         return _readJob;
     }
 
-    public boolean isWrite()
-    {
-        return !_readJob;
-    }
-
-
     /**
      * Another interface for a continuation.
      *
@@ -185,8 +177,8 @@
      */
     static interface JobCompletionHandler
     {
-        public void completed(IoSession session, Job job);
+        public void completed(Job job);
 
-        public void notCompleted(final IoSession session, final Job job);
+        public void notCompleted(final Job job);
     }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Tue Sep  1 16:27:52 2009
@@ -20,19 +20,17 @@
  */
 package org.apache.qpid.pool;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.qpid.pool.Event.CloseEvent;
-
+import org.apache.qpid.pool.Event.MinaReceivedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ExecutorService;
-
 /**
  * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
  * adds no behaviour by default to the filter chain, it is abstract.
@@ -74,7 +72,7 @@
     private final String _name;
 
     /** Defines the maximum number of events that will be batched into a single job. */
-    static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+    public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
 
     private final int _maxEvents;
 
@@ -188,7 +186,7 @@
         Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
         session.setAttribute(_name, job);
     }
-
+    
     /**
      * Retrieves this filters Job, by this filters name, from the Mina session.
      *
@@ -208,7 +206,7 @@
      * @param session The Mina session to work in.
      * @param job     The job that completed.
      */
-    public void completed(IoSession session, Job job)
+    public void completed(Job job)
     {
 
 
@@ -239,7 +237,7 @@
         }
     }
 
-    public void notCompleted(IoSession session, Job job)
+    public void notCompleted(Job job)
     {
         final ExecutorService pool = _poolReference.getPool();
 
@@ -430,7 +428,7 @@
         public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
         {
             Job job = getJobForSession(session);
-            fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
+            fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session));
         }
 
         /**
@@ -442,7 +440,7 @@
         public void sessionClosed(final NextFilter nextFilter, final IoSession session)
         {
             Job job = getJobForSession(session);
-            fireAsynchEvent(job, new CloseEvent(nextFilter));
+            fireAsynchEvent(job, new CloseEvent(nextFilter, session));
         }
     }
 
@@ -473,7 +471,7 @@
         public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
         {
             Job job = getJobForSession(session);
-            fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
+            fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session));
         }
 
         /**
@@ -485,7 +483,8 @@
         public void sessionClosed(final NextFilter nextFilter, final IoSession session)
         {
             Job job = getJobForSession(session);
-            fireAsynchEvent(job, new CloseEvent(nextFilter));
+            fireAsynchEvent(job, new CloseEvent(nextFilter, session));
         }
     }
+
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Tue Sep  1 16:27:52 2009
@@ -23,5 +23,4 @@
 public interface ReadWriteRunnable extends Runnable
 {
     boolean isRead();
-    boolean isWrite();
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Tue Sep  1 16:27:52 2009
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.protocol;
 
+import org.apache.qpid.transport.NetworkDriver;
+
 public interface ProtocolEngineFactory  
 { 
  
   // Returns a new instance of a ProtocolEngine 
-  ProtocolEngine newProtocolEngine(); 
+  ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); 
    
 } 
\ No newline at end of file

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java Tue Sep  1 16:27:52 2009
@@ -28,17 +28,17 @@
 public interface NetworkDriverConfiguration  
 {  
     // Taken from Socket  
-    boolean getKeepAlive();
-    boolean getOOBInline();
-    boolean getReuseAddress();
+    Boolean getKeepAlive();
+    Boolean getOOBInline();
+    Boolean getReuseAddress();
     Integer getSoLinger(); // null means off 
-    int getSoTimeout(); 
-    boolean getTcpNoDelay(); 
-    int getTrafficClass();
+    Integer getSoTimeout(); 
+    Boolean getTcpNoDelay(); 
+    Integer getTrafficClass();
 
     // The amount of memory in bytes to allocate to the incoming buffer 
-    int getReceiveBufferSize();  
+    Integer getReceiveBufferSize();  
 
     // The amount of memory in bytes to allocate to the outgoing buffer 
-    int getSendBufferSize();  
+    Integer getSendBufferSize();  
 } 

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Tue Sep  1 16:27:52 2009
@@ -181,6 +181,7 @@
     {
         return _ioSession.getLocalAddress();
     }
+    
 
     public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
             SSLEngine sslEngine) throws OpenException
@@ -251,6 +252,10 @@
 
     public void close()
     {
+        if (_lastWriteFuture != null)
+        {
+            _lastWriteFuture.join();
+        }
         if (_acceptor != null)
         {
             _acceptor.unbindAll();
@@ -359,9 +364,14 @@
 
                 protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
             }
-
+            
+            if (_ioSession == null)
+            {
+                _ioSession = protocolSession;
+            }
+            
             // Set up the protocol engine
-            ProtocolEngine protocolEngine = _factory.newProtocolEngine();
+            ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
             MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
             protocolEngine.setNetworkDriver(newDriver);
             protocolSession.setAttachment(protocolEngine);
@@ -385,4 +395,10 @@
        return _protocolEngine;
     }
 
+    public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections)
+    {
+        _factory = engineFactory;
+        _acceptingConnections = acceptingConnections;
+    }
+
 }

Added: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=810110&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Tue Sep  1 16:27:52 2009
@@ -0,0 +1,130 @@
+package org.apache.qpid.codec;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.HeartbeatBody;
+
+public class AMQDecoderTest extends TestCase
+{
+
+    private AMQCodecFactory _factory;
+    private AMQDecoder _decoder;
+
+
+    public void setUp()
+    {
+        _factory = new AMQCodecFactory(false, null);
+        _decoder = _factory.getDecoder();
+    }
+   
+    
+    public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+        if (frames.get(0) instanceof AMQFrame)
+        {
+            assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
+        }
+        else
+        {
+            fail("decode was not a frame");
+        }
+    }
+    
+    public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msgA = msg.slice();
+        int msgbPos = msg.remaining() / 2;
+        int msgaLimit = msg.remaining() - msgbPos;
+        msgA.limit(msgaLimit);
+        msg.position(msgbPos);
+        ByteBuffer msgB = msg.slice();
+        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA);
+        assertEquals(0, frames.size());
+        frames = _decoder.decodeBuffer(msgB);
+        assertEquals(1, frames.size());
+        if (frames.get(0) instanceof AMQFrame)
+        {
+            assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
+        }
+        else
+        {
+            fail("decode was not a frame");
+        }
+    }
+    
+    public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining());
+        msg.put(msgA);
+        msg.put(msgB);
+        msg.flip();
+        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+        assertEquals(2, frames.size());
+        for (AMQDataBlock frame : frames)
+        {
+            if (frame instanceof AMQFrame)
+            {
+                assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType());
+            }
+            else
+            {
+                fail("decode was not a frame");
+            }
+        }
+    }
+    
+    public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer();
+        
+        ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2);
+        sliceA.put(msgA);
+        int limit = msgB.limit();
+        int pos = msgB.remaining() / 2;
+        msgB.limit(pos);
+        sliceA.put(msgB);
+        sliceA.flip();
+        msgB.limit(limit);
+        msgB.position(pos);
+        
+        ByteBuffer sliceB = ByteBuffer.allocate(msgB.remaining() + pos);
+        sliceB.put(msgB);
+        msgC.limit(pos);
+        sliceB.put(msgC);
+        sliceB.flip();
+        msgC.limit(limit);
+        
+        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA);
+        assertEquals(1, frames.size());
+        frames = _decoder.decodeBuffer(sliceB);
+        assertEquals(1, frames.size());
+        frames = _decoder.decodeBuffer(msgC);
+        assertEquals(1, frames.size());
+        for (AMQDataBlock frame : frames)
+        {
+            if (frame instanceof AMQFrame)
+            {
+                assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType());
+            }
+            else
+            {
+                fail("decode was not a frame");
+            }
+        }
+    }
+    
+}

Added: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java?rev=810110&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java Tue Sep  1 16:27:52 2009
@@ -0,0 +1,95 @@
+package org.apache.qpid.codec;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Sender;
+
+public class MockAMQVersionAwareProtocolSession implements AMQVersionAwareProtocolSession
+{
+
+    @Override
+    public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public MethodRegistry getMethodRegistry()
+    {
+        return MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+    }
+
+    @Override
+    public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void init()
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void setSender(Sender<ByteBuffer> sender)
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void writeFrame(AMQDataBlock frame)
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public byte getProtocolMajorVersion()
+    {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public byte getProtocolMinorVersion()
+    {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public ProtocolVersion getProtocolVersion()
+    {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java Tue Sep  1 16:27:52 2009
@@ -299,7 +299,7 @@
         _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
         _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
         _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
-        assertEquals("Exception should not been thrown", 0, 
+        assertEquals("Exception should have been thrown", 0, 
                 _countingEngine.getExceptionLatch().getCount());
     } 
     
@@ -321,11 +321,12 @@
     {
         EchoProtocolEngine _engine = null;
         
-        public ProtocolEngine newProtocolEngine()
+        public ProtocolEngine newProtocolEngine(NetworkDriver driver)
         {
             if (_engine == null)
             {
                 _engine = new EchoProtocolEngine();
+                _engine.setNetworkDriver(driver);
             }
             return getEngine();
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org