You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 12:01:08 UTC

svn commit: r1560634 [5/7] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf2/examples/cpp/ qpid/cpp/bindings/qpid/dotnet/src/ qpid/cpp/bindings/qpid/dotnet/src/msvc10/ qpid/cpp/bindings/qpid/dotnet/src/msvc9/ q...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Thu Jan 23 11:01:02 2014
@@ -24,96 +24,44 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
 import org.apache.qpid.server.store.StoredMessage;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
+public class IncomingMessage
 {
 
-    /** Used for debugging purposes. */
-    private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
-
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-
+    private Exchange _exchange;
 
     /**
      * Keeps a track of how many bytes we have received in body frames
      */
     private long _bodyLengthReceived = 0;
+    private List<ContentBody> _contentChunks = new ArrayList<ContentBody>();
 
-    /**
-     * This is stored during routing, to know the queues to which this message should immediately be
-     * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
-     * by the message handle.
-     */
-    private List<? extends BaseQueue> _destinationQueues;
-
-    private long _expiration;
-
-    private Exchange _exchange;
-
-    private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
-
-    // we keep both the original meta data object and the store reference to it just in case the
-    // store would otherwise flow it to disk
-
-    private MessageMetaData _messageMetaData;
-
-    private StoredMessage<MessageMetaData> _storedMessageHandle;
-    private Object _connectionReference;
-
-
-    public IncomingMessage(
-            final MessagePublishInfo info
-    )
-    {
-        this(info, null);
-    }
-
-    public IncomingMessage(MessagePublishInfo info, Object reference)
+    public IncomingMessage(MessagePublishInfo info)
     {
         _messagePublishInfo = info;
-        _connectionReference = reference;
     }
 
-    public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
+    public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody)
     {
         _contentHeaderBody = contentHeaderBody;
     }
 
-    public void setExpiration()
-    {
-        _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
-    }
-
-    public MessageMetaData headersReceived(long currentTime)
-    {
-        _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
-        return _messageMetaData;
-    }
-
-
-    public List<? extends BaseQueue> getDestinationQueues()
+    public MessagePublishInfo getMessagePublishInfo()
     {
-        return _destinationQueues;
+        return _messagePublishInfo;
     }
 
-    public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
+    public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
     {
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
@@ -124,31 +72,14 @@ public class IncomingMessage implements 
         return (_bodyLengthReceived == getContentHeader().getBodySize());
     }
 
-    public AMQShortString getExchange()
+    public AMQShortString getExchangeName()
     {
         return _messagePublishInfo.getExchange();
     }
 
-    public String getRoutingKey()
-    {
-        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
-    }
-
-    public String getBinding()
-    {
-        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
-    }
-
-
-    public boolean isMandatory()
-    {
-        return _messagePublishInfo.isMandatory();
-    }
-
-
-    public boolean isImmediate()
+    public Exchange getExchange()
     {
-        return _messagePublishInfo.isImmediate();
+        return _exchange;
     }
 
     public ContentHeaderBody getContentHeader()
@@ -156,129 +87,24 @@ public class IncomingMessage implements 
         return _contentHeaderBody;
     }
 
-
-    public AMQMessageHeader getMessageHeader()
-    {
-        return _messageMetaData.getMessageHeader();
-    }
-
-    public boolean isPersistent()
-    {
-        return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
-                                                             BasicContentHeaderProperties.PERSISTENT;
-    }
-
-    public boolean isRedelivered()
-    {
-        return false;
-    }
-
-
     public long getSize()
     {
         return getContentHeader().getBodySize();
     }
 
-    public long getMessageNumber()
-    {
-        return _storedMessageHandle.getMessageNumber();
-    }
-
     public void setExchange(final Exchange e)
     {
         _exchange = e;
     }
 
-    public void route()
-    {
-        enqueue(_exchange.route(this));
-
-    }
-
-    public void enqueue(final List<? extends BaseQueue> queues)
-    {
-        _destinationQueues = queues;
-    }
-
-    public MessagePublishInfo getMessagePublishInfo()
-    {
-        return _messagePublishInfo;
-    }
-
-    public long getExpiration()
-    {
-        return _expiration;
-    }
-
     public int getBodyCount() throws AMQException
     {
         return _contentChunks.size();
     }
 
-    public ContentChunk getContentChunk(int index)
+    public ContentBody getContentChunk(int index)
     {
         return _contentChunks.get(index);
     }
 
-
-    public int getContent(ByteBuffer buf, int offset)
-    {
-        int pos = 0;
-        int written = 0;
-        for(ContentChunk cb : _contentChunks)
-        {
-            ByteBuffer data = ByteBuffer.wrap(cb.getData());
-            if(offset+written >= pos && offset < pos + data.limit())
-            {
-                ByteBuffer src = data.duplicate();
-                src.position(offset+written - pos);
-                src = src.slice();
-
-                if(buf.remaining() < src.limit())
-                {
-                    src.limit(buf.remaining());
-                }
-                int count = src.limit();
-                buf.put(src);
-                written += count;
-                if(buf.remaining() == 0)
-                {
-                    break;
-                }
-            }
-            pos+=data.limit();
-        }
-        return written;
-
-    }
-
-
-    public ByteBuffer getContent(int offset, int size)
-    {
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        getContent(buf,offset);
-        buf.flip();
-        return buf;
-    }
-
-    public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
-    {
-        _storedMessageHandle = storedMessageHandle;
-    }
-
-    public StoredMessage<MessageMetaData> getStoredMessage()
-    {
-        return _storedMessageHandle;
-    }
-
-    public Object getConnectionReference()
-    {
-        return _connectionReference;
-    }
-
-    public MessageMetaData getMessageMetaData()
-    {
-        return _messageMetaData;
-    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Thu Jan 23 11:01:02 2014
@@ -46,11 +46,10 @@ import java.util.Set;
  */
 public class MessageMetaData implements StorableMessageMetaData
 {
-    private MessagePublishInfo _messagePublishInfo;
+    private final MessagePublishInfo _messagePublishInfo;
 
-    private ContentHeaderBody _contentHeaderBody;
+    private final ContentHeaderBody _contentHeaderBody;
 
-    private int _contentChunkCount;
 
     private long _arrivalTime;
     private static final byte MANDATORY_FLAG = 1;
@@ -58,59 +57,36 @@ public class MessageMetaData implements 
     public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
     private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8();
 
-    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody)
     {
-        this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
+        this(publishBody,contentHeaderBody, System.currentTimeMillis());
     }
 
-    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+    public MessageMetaData(MessagePublishInfo publishBody,
+                           ContentHeaderBody contentHeaderBody,
+                           long arrivalTime)
     {
         _contentHeaderBody = contentHeaderBody;
         _messagePublishInfo = publishBody;
-        _contentChunkCount = contentChunkCount;
         _arrivalTime = arrivalTime;
     }
 
-    public int getContentChunkCount()
-    {
-        return _contentChunkCount;
-    }
-
-    public void setContentChunkCount(int contentChunkCount)
-    {
-        _contentChunkCount = contentChunkCount;
-    }
 
     public ContentHeaderBody getContentHeaderBody()
     {
         return _contentHeaderBody;
     }
 
-    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
-    {
-        _contentHeaderBody = contentHeaderBody;
-    }
-
     public MessagePublishInfo getMessagePublishInfo()
     {
         return _messagePublishInfo;
     }
 
-    public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
-    {
-        _messagePublishInfo = messagePublishInfo;
-    }
-
     public long getArrivalTime()
     {
         return _arrivalTime;
     }
 
-    public void setArrivalTime(long arrivalTime)
-    {
-        _arrivalTime = arrivalTime;
-    }
-
     public MessageMetaDataType getType()
     {
         return TYPE;
@@ -169,8 +145,7 @@ public class MessageMetaData implements 
 
     public boolean isPersistent()
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
-        return properties.getDeliveryMode() ==  BasicContentHeaderProperties.PERSISTENT;
+        return _contentHeaderBody.getProperties().getDeliveryMode() ==  BasicContentHeaderProperties.PERSISTENT;
     }
 
     private static class MetaDataFactory implements MessageMetaDataType.Factory
@@ -219,7 +194,7 @@ public class MessageMetaData implements 
                                 return routingKey;
                             }
                         };
-                return new MessageMetaData(publishBody, chb, 0, arrivalTime);
+                return new MessageMetaData(publishBody, chb, arrivalTime);
             }
             catch (AMQException e)
             {
@@ -242,7 +217,7 @@ public class MessageMetaData implements 
     {
         private BasicContentHeaderProperties getProperties()
         {
-            return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
+            return getContentHeaderBody().getProperties();
         }
 
         public String getUserId()
@@ -300,18 +275,6 @@ public class MessageMetaData implements 
             return getProperties().getReplyToAsString();
         }
 
-        public String getReplyToExchange()
-        {
-            // TODO
-            return getReplyTo();
-        }
-
-        public String getReplyToRoutingKey()
-        {
-            // TODO
-            return getReplyTo();
-        }
-
         public Object getHeader(String name)
         {
             FieldTable ft = getProperties().getHeaders();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Thu Jan 23 11:01:02 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -488,7 +489,7 @@ public abstract class SubscriptionImpl i
             {
                 AMQMessage message = (AMQMessage) entry.getMessage();
 
-                final Object publisherReference = message.getConnectionIdentifier();
+                final Object publisherReference = message.getConnectionReference();
 
                 // We don't want local messages so check to see if message is one we sent
                 Object localReference = getProtocolSession().getReference();
@@ -519,7 +520,7 @@ public abstract class SubscriptionImpl i
 
     private boolean checkFilters(QueueEntry msg)
     {
-        return (_filters == null) || _filters.allAllow(msg);
+        return (_filters == null) || _filters.allAllow(msg.asFilterable());
     }
 
     public boolean isAutoClose()

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Thu Jan 23 11:01:02 2014
@@ -119,8 +119,6 @@ public class AckTest extends QpidTestCas
                     return new AMQShortString("rk");
                 }
             };
-            final IncomingMessage msg = new IncomingMessage(publishBody);
-            //IncomingMessage msg2 = null;
             BasicContentHeaderProperties b = new BasicContentHeaderProperties();
             ContentHeaderBody cb = new ContentHeaderBody();
             cb.setProperties(b);
@@ -131,42 +129,35 @@ public class AckTest extends QpidTestCas
                 b.setDeliveryMode((byte) 2);
             }
 
-            msg.setContentHeaderBody(cb);
-
             // we increment the reference here since we are not delivering the messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
-            msg.enqueue(qs);
-            MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
+            MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
             final StoredMessage storedMessage = _messageStore.addMessage(mmd);
-            msg.setStoredMessage(storedMessage);
             final AMQMessage message = new AMQMessage(storedMessage);
-            if(msg.allContentReceived())
-            {
-                ServerTransaction txn = new AutoCommitTransaction(_messageStore);
-                txn.enqueue(_queue, message, new ServerTransaction.Action() {
-                    public void postCommit()
+            ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+            txn.enqueue(_queue, message, new ServerTransaction.Action() {
+                public void postCommit()
+                {
+                    try
                     {
-                        try
-                        {
 
-                            _queue.enqueue(message);
-                        }
-                        catch (AMQException e)
-                        {
-                             throw new RuntimeException(e);
-                        }
+                        _queue.enqueue(message);
                     }
-
-                    public void onRollback()
+                    catch (AMQException e)
                     {
-                        //To change body of implemented methods use File | Settings | File Templates.
+                         throw new RuntimeException(e);
                     }
-                });
+                }
+
+                public void onRollback()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+            });
 
-            }
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Thu Jan 23 11:01:02 2014
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -55,6 +56,7 @@ import org.apache.qpid.transport.network
 
 public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
 {
+    private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
     // ChannelID(LIST)  -> LinkedList<Pair>
     private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
     private AtomicInteger _deliveryCount = new AtomicInteger(0);
@@ -185,7 +187,7 @@ public class InternalTestProtocolSession
             }
             catch (InterruptedException e)
             {
-                e.printStackTrace();
+                _logger.error("Thread interupted", e);
             }
         }
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Thu Jan 23 11:01:02 2014
@@ -57,9 +57,9 @@ public class MockStoredMessage implement
         {
             FieldTable headers = new FieldTable();
             headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
-            ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
+            ( chb.getProperties()).setHeaders(headers);
         }
-        _metaData = new MessageMetaData(info, chb, 0);
+        _metaData = new MessageMetaData(info, chb);
         _content = ByteBuffer.allocate(_metaData.getContentSize());
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Thu Jan 23 11:01:02 2014
@@ -81,7 +81,7 @@ public class ReferenceCountingTest exten
 
 
 
-        MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+        MessageMetaData mmd = new MessageMetaData(info, chb);
         StoredMessage storedMessage = _store.addMessage(mmd);
 
 
@@ -139,7 +139,7 @@ public class ReferenceCountingTest exten
 
         final ContentHeaderBody chb = createPersistentContentHeader();
 
-        MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+        MessageMetaData mmd = new MessageMetaData(info, chb);
         StoredMessage storedMessage = _store.addMessage(mmd);
 
         AMQMessage message = new AMQMessage(storedMessage);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml Thu Jan 23 11:01:02 2014
@@ -40,6 +40,14 @@
       <version>0.26-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
+
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
   </dependencies>
 
   <build>

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:r1558037-1560619

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Jan 23 11:01:02 2014
@@ -51,6 +51,8 @@ public class Connection_1_0 implements C
     private final ConnectionEndpoint _conn;
     private final long _connectionId;
     private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+    private final Object _reference = new Object();
+
 
 
     public static interface Task
@@ -79,6 +81,11 @@ public class Connection_1_0 implements C
 
     }
 
+    public Object getReference()
+    {
+        return _reference;
+    }
+
     public void remoteSessionCreation(SessionEndpoint endpoint)
     {
         Session_1_0 session = new Session_1_0(_vhost, this);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Thu Jan 23 11:01:02 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.txn.ServerTransaction;
 
@@ -53,34 +55,71 @@ public class ExchangeDestination impleme
 
     public Outcome send(final Message_1_0 message, ServerTransaction txn)
     {
-        final List<? extends BaseQueue> queues = _exchange.route(message);
+        final InstanceProperties instanceProperties =
+            new InstanceProperties()
+            {
+
+                @Override
+                public Object getProperty(final Property prop)
+                {
+                    switch(prop)
+                    {
+                        case MANDATORY:
+                            return false;
+                        case REDELIVERED:
+                            return false;
+                        case PERSISTENT:
+                            return message.isPersistent();
+                        case IMMEDIATE:
+                            return false;
+                        case EXPIRATION:
+                            return message.getExpiration();
+                    }
+                    return null;
+                }};
 
-        txn.enqueue(queues,message, new ServerTransaction.Action()
+        List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+
+        if(queues == null || queues.isEmpty())
         {
+            Exchange altExchange = _exchange.getAlternateExchange();
+            if(altExchange != null)
+            {
+                queues = altExchange.route(message, instanceProperties);
+            }
+        }
 
-            BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+        if(queues != null && !queues.isEmpty())
+        {
+            final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
 
-            public void postCommit()
+            txn.enqueue(queues,message, new ServerTransaction.Action()
             {
-                for(int i = 0; i < _queues.length; i++)
+                MessageReference _reference = message.newReference();
+
+                public void postCommit()
                 {
-                    try
-                    {
-                        _queues[i].enqueue(message);
-                    }
-                    catch (AMQException e)
+                    for(int i = 0; i < baseQueues.length; i++)
                     {
-                        // TODO
-                        throw new RuntimeException(e);
+                        try
+                        {
+                            baseQueues[i].enqueue(message);
+                        }
+                        catch (AMQException e)
+                        {
+                            // TODO
+                            throw new RuntimeException(e);
+                        }
                     }
+                    _reference.release();
                 }
-            }
 
-            public void onRollback()
-            {
-                // NO-OP
-            }
-        });
+                public void onRollback()
+                {
+                    _reference.release();
+                }
+            });
+        }
 
         return ACCEPTED;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Thu Jan 23 11:01:02 2014
@@ -272,7 +272,7 @@ public abstract class MessageConverter_t
                 @Override
                 public void remove()
                 {
-                    serverMessage.getStoredMessage().remove();
+                    throw new UnsupportedOperationException();
                 }
             };
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Thu Jan 23 11:01:02 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.
 
 import java.nio.ByteBuffer;
 import java.util.*;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.amqp_1_0.codec.ValueHandler;
 import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -44,6 +46,7 @@ import org.apache.qpid.server.store.Stor
 
 public class MessageMetaData_1_0 implements StorableMessageMetaData
 {
+    private static final Logger _logger = Logger.getLogger(MessageMetaData_1_0.class);
     // TODO move to somewhere more useful
     public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
     public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
@@ -275,7 +278,7 @@ public class MessageMetaData_1_0 impleme
         }
         catch (AmqpErrorException e)
         {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            _logger.error("Decoding read section error", e);
             throw new IllegalArgumentException(e);
         }
     }
@@ -501,16 +504,6 @@ public class MessageMetaData_1_0 impleme
             }
         }
 
-        public String getReplyToExchange()
-        {
-            return null;  //TODO
-        }
-
-        public String getReplyToRoutingKey()
-        {
-            return null;  //TODO
-        }
-
         public String getAppId()
         {
             //TODO

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java:r1558037-1560619

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Thu Jan 23 11:01:02 2014
@@ -21,35 +21,22 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 
-import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
 
-public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
 
-
-    private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
-            AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
-
-    private volatile int _referenceCount = 0;
-
-    private final StoredMessage<MessageMetaData_1_0> _storedMessage;
     private List<ByteBuffer> _fragments;
-    private WeakReference<Session_1_0> _session;
     private long _arrivalTime;
 
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
-        _storedMessage = storedMessage;
-        _session = null;
+        super(storedMessage, null);
         _fragments = restoreFragments(storedMessage);
     }
 
@@ -75,11 +62,10 @@ public class Message_1_0 implements Serv
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
                        final List<ByteBuffer> fragments,
-                       final Session_1_0 session)
+                       final Object connectionReference)
     {
-        _storedMessage = storedMessage;
+        super(storedMessage, connectionReference);
         _fragments = fragments;
-        _session = new WeakReference<Session_1_0>(session);
         _arrivalTime = System.currentTimeMillis();
     }
 
@@ -98,7 +84,7 @@ public class Message_1_0 implements Serv
 
     private MessageMetaData_1_0 getMessageMetaData()
     {
-        return _storedMessage.getMetaData();
+        return getStoredMessage().getMetaData();
     }
 
     public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
@@ -106,16 +92,6 @@ public class Message_1_0 implements Serv
         return getMessageMetaData().getMessageHeader();
     }
 
-    public StoredMessage getStoredMessage()
-    {
-        return _storedMessage;
-    }
-
-    public boolean isPersistent()
-    {
-        return getMessageMetaData().isPersistent();
-    }
-
     public boolean isRedelivered()
     {
         // TODO
@@ -136,121 +112,19 @@ public class Message_1_0 implements Serv
         return size;
     }
 
-    public boolean isImmediate()
-    {
-        return false;
-    }
-
     public long getExpiration()
     {
         return getMessageHeader().getExpiration();
     }
 
-    public MessageReference<Message_1_0> newReference()
-    {
-        return new Reference(this);
-    }
-
-    public long getMessageNumber()
-    {
-        return _storedMessage.getMessageNumber();
-    }
-
     public long getArrivalTime()
     {
         return _arrivalTime;
     }
 
-    public int getContent(final ByteBuffer buf, final int offset)
-    {
-        return _storedMessage.getContent(offset, buf);
-    }
-
-    public ByteBuffer getContent(int offset, int size)
-    {
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        buf.limit(getContent(buf, offset));
-
-        return buf;
-    }
-
     public List<ByteBuffer> getFragments()
     {
         return _fragments;
     }
 
-    public Session_1_0 getSession()
-    {
-        return _session == null ? null : _session.get();
-    }
-
-
-    public boolean incrementReference()
-    {
-        if(_refCountUpdater.incrementAndGet(this) <= 0)
-        {
-            _refCountUpdater.decrementAndGet(this);
-            return false;
-        }
-        else
-        {
-            return true;
-        }
-    }
-
-    /**
-     * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
-     * message store.
-     */
-
-    public void decrementReference()
-    {
-        int count = _refCountUpdater.decrementAndGet(this);
-
-        // note that the operation of decrementing the reference count and then removing the message does not
-        // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
-        // the message has been passed to all queues. i.e. we are
-        // not relying on the all the increments having taken place before the delivery manager decrements.
-        if (count == 0)
-        {
-            // set the reference count way below 0 so that we can detect that the message has been deleted
-            // this is to guard against the message being spontaneously recreated (from the mgmt console)
-            // by copying from other queues at the same time as it is being removed.
-            _refCountUpdater.set(this,Integer.MIN_VALUE/2);
-
-            // must check if the handle is null since there may be cases where we decide to throw away a message
-            // and the handle has not yet been constructed
-            if (_storedMessage != null)
-            {
-                _storedMessage.remove();
-            }
-        }
-        else
-        {
-            if (count < 0)
-            {
-                throw new RuntimeException("Reference count for message id " + getMessageNumber()
-                                                  + " has gone below 0.");
-            }
-        }
-    }
-
-    public static class Reference extends MessageReference<Message_1_0>
-    {
-        public Reference(Message_1_0 message)
-        {
-            super(message);
-        }
-
-        protected void onReference(Message_1_0 message)
-        {
-            message.incrementReference();
-        }
-
-        protected void onRelease(Message_1_0 message)
-        {
-            message.decrementReference();
-        }
-
-    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Thu Jan 23 11:01:02 2014
@@ -20,18 +20,17 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.amqp_1_0.type.Outcome;
 import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import org.apache.qpid.server.txn.ServerTransaction;
 
-import java.util.Arrays;
-
 public class QueueDestination implements SendingDestination, ReceivingDestination
 {
+    private static final Logger _logger = Logger.getLogger(QueueDestination.class);
     private static final Accepted ACCEPTED = new Accepted();
     private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
 
@@ -80,7 +79,7 @@ public class QueueDestination implements
         }
         catch(Exception e)
         {
-            e.printStackTrace();
+            _logger.error("Send error", e);
             throw new RuntimeException(e);
         }
         return ACCEPTED;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Thu Jan 23 11:01:02 2014
@@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -170,8 +171,8 @@ public class ReceivingLink_1_0 implement
 
             storedMessage.flushToStore();
 
-            Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
-
+            Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+            MessageReference<Message_1_0> reference = message.newReference();
 
             Binary transactionId = null;
             org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
@@ -241,6 +242,8 @@ public class ReceivingLink_1_0 implement
                     }
                 });
             }
+
+            reference.release();
         }
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Jan 23 11:01:02 2014
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
@@ -64,7 +66,6 @@ import org.apache.qpid.server.filter.JMS
 import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -72,6 +73,8 @@ import org.apache.qpid.server.virtualhos
 
 public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
 {
+    private static final Logger _logger = Logger.getLogger(SendingLink_1_0.class);
+
     private VirtualHost _vhost;
     private SendingDestination _destination;
 
@@ -319,7 +322,8 @@ public class SendingLink_1_0 implements 
                                                         }
                                                         catch (AMQException e)
                                                         {
-                                                            e.printStackTrace();  //TODO.
+                                                            //TODO
+                                                            _logger.error("Error removing queue", e);
                                                         }
                                                     }
                                                 }
@@ -342,14 +346,15 @@ public class SendingLink_1_0 implements 
             }
             catch (AMQSecurityException e)
             {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                _logger.error("Security error", e);
             }
             catch (AMQInternalException e)
             {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            } catch (AMQException e)
+                _logger.error("Internal error", e);
+            }
+            catch (AMQException e)
             {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                _logger.error("Error", e);
             }
             _subscription = new Subscription_1_0(this, qd, true);
 
@@ -370,7 +375,8 @@ public class SendingLink_1_0 implements 
             }
             catch (AMQException e)
             {
-                e.printStackTrace();  //TODO
+                //TODO
+                _logger.error("Error registering subscription", e);
             }
         }
 
@@ -398,7 +404,8 @@ public class SendingLink_1_0 implements 
             }
             catch (AMQException e)
             {
-                e.printStackTrace();  //TODO
+                //TODO
+                _logger.error("Error unregistering subscription", e);
             }
 
             Modified state = new Modified();
@@ -423,7 +430,8 @@ public class SendingLink_1_0 implements 
                 }
                 catch(AMQException e)
                 {
-                    e.printStackTrace();  // TODO - Implement
+                    //TODO
+                    _logger.error("Error removing queue", e);
                 }
             }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Jan 23 11:01:02 2014
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.text.MessageFormat;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -39,13 +41,11 @@ import org.apache.qpid.AMQSecurityExcept
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -56,6 +56,7 @@ import static org.apache.qpid.server.log
 
 public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
 {
+    private static final Logger _logger = Logger.getLogger(Session_1_0.class);
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
     private VirtualHost _vhost;
     private AutoCommitTransaction _transaction;
@@ -155,7 +156,7 @@ public class Session_1_0 implements Sess
                     }
                     catch(AmqpErrorException e)
                     {
-                        e.printStackTrace();
+                        _logger.error("Error creating sending link", e);
                         destination = null;
                         sendingLinkEndpoint.setSource(null);
                         error = e.getError();
@@ -355,7 +356,8 @@ public class Session_1_0 implements Sess
                                     }
                                     catch (AMQException e)
                                     {
-                                        e.printStackTrace();  //TODO.
+                                        //TODO
+                                        _logger.error("Error removing queue from vhost", e);
                                     }
                                 }
                             }
@@ -388,10 +390,13 @@ public class Session_1_0 implements Sess
         }
         catch (AMQSecurityException e)
         {
-            e.printStackTrace();  //TODO.
-        } catch (AMQException e)
+            //TODO
+            _logger.error("Security error", e);
+        }
+        catch (AMQException e)
         {
-            e.printStackTrace();  //TODO
+            //TODO
+            _logger.error("Error", e);
         }
 
         return queue;
@@ -533,10 +538,9 @@ public class Session_1_0 implements Sess
     }
 
     @Override
-    public boolean onSameConnection(InboundMessage inbound)
+    public Object getConnectionReference()
     {
-        // TODO
-        return false;
+        return getConnection().getReference();
     }
 
     @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Jan 23 11:01:02 2014
@@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
 
-class Subscription_1_0 implements Subscription
+class
+        Subscription_1_0 implements Subscription
 {
     private SendingLink_1_0 _link;
 
@@ -149,7 +150,7 @@ class Subscription_1_0 implements Subscr
     {
         if(entry.getMessage() instanceof Message_1_0)
         {
-            if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+            if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
             {
                 return false;
             }
@@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscr
 
     private boolean checkFilters(final QueueEntry entry)
     {
-        return (_filters == null) || _filters.allAllow(entry);
+        return (_filters == null) || _filters.allAllow(entry.asFilterable());
     }
 
     public boolean isClosed()

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Thu Jan 23 11:01:02 2014
@@ -20,13 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
 import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
 import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.transaction.Declare;
 import org.apache.qpid.amqp_1_0.type.transaction.Declared;
@@ -42,6 +42,7 @@ import java.util.*;
 
 public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0
 {
+    private static final Logger _logger = Logger.getLogger(TxnCoordinatorLink_1_0.class);
     private VirtualHost _vhost;
     private ReceivingLinkEndpoint _endpoint;
 
@@ -149,7 +150,8 @@ public class TxnCoordinatorLink_1_0 impl
         }
         catch (AmqpErrorException e)
         {
-            e.printStackTrace();  //TODO.
+            //TODO
+            _logger.error("AMQP error", e);
         }
 
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Thu Jan 23 11:01:02 2014
@@ -225,7 +225,6 @@ public class MessageConverter_0_10_to_0_
     {
         return new MessageMetaData(convertPublishBody(message),
                 convertContentHeaderBody(message, vhost),
-                1,
                 message.getArrivalTime());
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Thu Jan 23 11:01:02 2014
@@ -121,7 +121,7 @@ public class MessageConverter_0_8_to_0_1
         body.flip();
 
         BasicContentHeaderProperties properties =
-                (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+                  message_0_8.getContentHeaderBody().getProperties();
 
         final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
         if(exchange != null)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Thu Jan 23 11:01:02 2014
@@ -56,7 +56,7 @@ public class MessageConverter_0_8_to_1_0
         header.setDurable(serverMessage.isPersistent());
 
         BasicContentHeaderProperties contentHeader =
-                (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+                  serverMessage.getContentHeaderBody().getProperties();
 
         header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
         final long expiration = serverMessage.getExpiration();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml Thu Jan 23 11:01:02 2014
@@ -41,6 +41,13 @@
       <scope>compile</scope>
     </dependency>
 
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml Thu Jan 23 11:01:02 2014
@@ -34,6 +34,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml Thu Jan 23 11:01:02 2014
@@ -35,6 +35,20 @@
     </dependency>
 
     <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-servlet_2.5_spec</artifactId>
       <version>1.2</version>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml Thu Jan 23 11:01:02 2014
@@ -41,6 +41,13 @@
       <scope>compile</scope>
     </dependency>
 
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml Thu Jan 23 11:01:02 2014
@@ -40,12 +40,12 @@
       <scope>compile</scope>
     </dependency>
 
-	<dependency>
-	  <groupId>log4j</groupId>
-	  <artifactId>log4j</artifactId>
-	  <version>1.2.16</version>
-	  <scope>compile</scope>
-	</dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
 
     <dependency>
       <groupId>commons-cli</groupId>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml Thu Jan 23 11:01:02 2014
@@ -43,7 +43,7 @@
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
-      <version>1.6.4</version>
+      <version>${slf4j-version}</version>
       <scope>compile</scope>
     </dependency>
 
@@ -54,13 +54,6 @@
       <version>0.26-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
-
-	<dependency>
-	  <groupId>log4j</groupId>
-	  <artifactId>log4j</artifactId>
-	  <version>1.2.16</version>
-	  <scope>test</scope>
-	</dependency>
   </dependencies>
 
   <build>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Thu Jan 23 11:01:02 2014
@@ -30,7 +30,6 @@ import org.apache.qpid.client.CustomJMSX
 import org.apache.qpid.client.JMSAMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 
@@ -73,7 +72,7 @@ public class AMQMessageDelegate_0_8 exte
     private static final boolean STRICT_AMQP_COMPLIANCE =
             Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
 
-    private ContentHeaderProperties _contentHeaderProperties;
+    private BasicContentHeaderProperties _contentHeaderProperties;
 
     // The base set of items that needs to be set. 
     private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
@@ -81,7 +80,7 @@ public class AMQMessageDelegate_0_8 exte
         super(deliveryTag);
         _contentHeaderProperties = properties;
         _readableProperties = (_contentHeaderProperties != null);
-        _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
+        _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders()
                                                                   : (new BasicContentHeaderProperties()).getHeaders() );
     }
 
@@ -90,7 +89,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         this(new BasicContentHeaderProperties(), -1);
         _readableProperties = false;
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+        _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders());
 
     }
 
@@ -337,7 +336,7 @@ public class AMQMessageDelegate_0_8 exte
 
     public BasicContentHeaderProperties getContentHeaderProperties()
     {
-        return (BasicContentHeaderProperties) _contentHeaderProperties;
+        return _contentHeaderProperties;
     }
 
 
@@ -443,7 +442,7 @@ public class AMQMessageDelegate_0_8 exte
         //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
         if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
         {
-            return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+            return _contentHeaderProperties.getUserIdAsString();
         }
         else
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Jan 23 11:01:02 2014
@@ -101,7 +101,7 @@ public abstract class AbstractJMSMessage
         }
 
         AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
-                                                                 (BasicContentHeaderProperties) contentHeader.getProperties(),
+                                                                 contentHeader.getProperties(),
                                                                  exchange, routingKey, queueDestinationCache, topicDestinationCache);
 
         return createMessage(delegate, data);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Jan 23 11:01:02 2014
@@ -110,7 +110,7 @@ public class MessageFactoryRegistry
                                             AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
             throws AMQException, JMSException
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
+        BasicContentHeaderProperties properties = contentHeader.getProperties();
 
         // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
         // AMQP. When the type is null, it can only be assumed that the message is a byte message.

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Thu Jan 23 11:01:02 2014
@@ -179,12 +179,9 @@ public class ClientConnectionDelegate ex
     }
 
     @Override
-    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat)
     {
-        // ClientDelegate simply responds to heartbeats with heartbeats
         _heartbeatListener.heartbeatReceived();
-        super.connectionHeartbeat(conn, hearbeat);
-        _heartbeatListener.heartbeatSent();
     }
 
 
@@ -192,4 +189,11 @@ public class ClientConnectionDelegate ex
     {
         _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
     }
+
+    @Override
+    public void writerIdle(final Connection connection)
+    {
+        super.writerIdle(connection);
+        _heartbeatListener.heartbeatSent();
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Jan 23 11:01:02 2014
@@ -179,7 +179,7 @@ public class AMQProtocolHandlerTest exte
                 }
                 catch (Exception e)
                 {
-                    e.printStackTrace();
+                    _logger.error(e.getMessage(), e);
                     fail(e.getMessage());
                 }
             }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Thu Jan 23 11:01:02 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.messag
 
 import junit.framework.TestCase;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
@@ -42,7 +43,7 @@ import javax.jms.TextMessage;
 
 public class MessageConverterTest extends TestCase
 {
-
+    private static final Logger _logger = Logger.getLogger(MessageConverterTest.class);
     public static final String JMS_CORR_ID = "QPIDID_01";
     public static final int JMS_DELIV_MODE = 1;
     public static final String JMS_TYPE = "test.jms.type";
@@ -134,8 +135,8 @@ public class MessageConverterTest extend
         }
         catch (JMSException e)
         {
+            _logger.error("An error occured testing the property values", e);
             fail("An error occured testing the property values" + e.getCause());
-            e.printStackTrace();
         }
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml Thu Jan 23 11:01:02 2014
@@ -34,12 +34,12 @@
   </properties>
 
   <dependencies>
-	<dependency>
-	  <groupId>org.slf4j</groupId>
-	  <artifactId>slf4j-api</artifactId>
-	  <version>1.6.4</version>
-	  <scope>compile</scope>
-	</dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>compile</scope>
+    </dependency>
 
     <!-- test dependencies -->
     <dependency>
@@ -49,12 +49,6 @@
       <scope>test</scope>
     </dependency>
 
-	<dependency>
-	  <groupId>log4j</groupId>
-	  <artifactId>log4j</artifactId>
-	  <version>1.2.16</version>
-	  <scope>test</scope>
-	</dependency>
   </dependencies>
    
   <build>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Thu Jan 23 11:01:02 2014
@@ -27,7 +27,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-public class BasicContentHeaderProperties implements CommonContentHeaderProperties
+public class BasicContentHeaderProperties
 {
     //persistent & non-persistent constants, values as per JMS DeliveryMode
     public static final int NON_PERSISTENT = 1;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Jan 23 11:01:02 2014
@@ -39,7 +39,7 @@ public class ContentHeaderBody implement
     private long bodySize;
 
     /** must never be null */
-    private ContentHeaderProperties properties;
+    private BasicContentHeaderProperties properties;
 
     public ContentHeaderBody()
     {
@@ -57,13 +57,13 @@ public class ContentHeaderBody implement
     }
 
 
-    public ContentHeaderBody(ContentHeaderProperties props, int classId)
+    public ContentHeaderBody(BasicContentHeaderProperties props, int classId)
     {
         properties = props;
         this.classId = classId;
     }
 
-    public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+    public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize)
     {
         this(props, classId);
         this.weight = weight;
@@ -121,12 +121,12 @@ public class ContentHeaderBody implement
         return new AMQFrame(channelId, body);
     }
 
-    public ContentHeaderProperties getProperties()
+    public BasicContentHeaderProperties getProperties()
     {
         return properties;
     }
 
-    public void setProperties(ContentHeaderProperties props)
+    public void setProperties(BasicContentHeaderProperties props)
     {
         properties = props;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Thu Jan 23 11:01:02 2014
@@ -38,11 +38,11 @@ public class ContentHeaderPropertiesFact
     {
     }
 
-    public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+    public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
                                                                  DataInput buffer, int size)
              throws AMQFrameDecodingException, IOException
     {
-        ContentHeaderProperties properties;
+        BasicContentHeaderProperties properties;
         // AMQP version change: "Hardwired" version to major=8, minor=0
         // TODO: Change so that the actual version is obtained from
         // the ProtocolInitiation object for this session.

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Thu Jan 23 11:01:02 2014
@@ -21,14 +21,10 @@
 
 package org.apache.qpid.framing.abstraction;
 
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQMethodBody;
 
-public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+public interface ProtocolVersionMethodConverter
 {
-    AMQBody convertToBody(ContentChunk contentBody);
-    ContentChunk convertToContentChunk(AMQBody body);
-
-    void configure();
-
-    AMQBody convertToBody(byte[] input);
+    public MessagePublishInfo convertToInfo(AMQMethodBody body);
+    public AMQMethodBody convertToBody(MessagePublishInfo info);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Jan 23 11:01:02 2014
@@ -21,13 +21,10 @@
 
 package org.apache.qpid.framing.amqp_0_9;
 
-import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -35,48 +32,12 @@ import org.apache.qpid.framing.abstracti
 
 public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
-    private int _basicPublishClassId;
-    private int _basicPublishMethodId;
 
     public MethodConverter_0_9()
     {
         super((byte)0,(byte)9);
-
-
-    }
-
-    public AMQBody convertToBody(ContentChunk contentChunk)
-    {
-        if(contentChunk instanceof ContentChunk_0_9)
-        {
-            return ((ContentChunk_0_9)contentChunk).toBody();
-        }
-        else
-        {
-            return new ContentBody(contentChunk.getData());
-        }
-    }
-
-    public ContentChunk convertToContentChunk(AMQBody body)
-    {
-        final ContentBody contentBodyChunk = (ContentBody) body;
-
-        return new ContentChunk_0_9(contentBodyChunk);
-
     }
 
-    public void configure()
-    {
-
-        _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
-        _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
-    }
-
-    public AMQBody convertToBody(byte[] data)
-    {
-        return new ContentBody(data);
-    }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
     {
@@ -103,33 +64,4 @@ public class MethodConverter_0_9 extends
 
     }
 
-    private static class ContentChunk_0_9 implements ContentChunk
-    {
-        private final ContentBody _contentBodyChunk;
-
-        public ContentChunk_0_9(final ContentBody contentBodyChunk)
-        {
-            _contentBodyChunk = contentBodyChunk;
-        }
-
-        public int getSize()
-        {
-            return _contentBodyChunk.getSize();
-        }
-
-        public byte[] getData()
-        {
-            return _contentBodyChunk.getPayload();
-        }
-
-        public void reduceToFit()
-        {
-            _contentBodyChunk.reduceBufferToFit();
-        }
-
-        public AMQBody toBody()
-        {
-            return _contentBodyChunk;
-        }
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org