You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [20/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Thu Feb 28 16:14:30 2013
@@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transpor
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
 
 public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
 {
@@ -43,16 +42,26 @@ public class ConnectionImpl implements C
     private boolean _isQueueConnection;
     private boolean _isTopicConnection;
     private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
+    private final String _host;
+    private final int _port;
+    private final String _username;
+    private final String _password;
+    private final String _remoteHost;
+    private final boolean _ssl;
+    private String _clientId;
+    private String _queuePrefix;
+    private String _topicPrefix;
 
 
     private static enum State
     {
+        UNCONNECTED,
         STOPPED,
         STARTED,
         CLOSED
     }
 
-    private volatile State _state = State.STOPPED;
+    private volatile State _state = State.UNCONNECTED;
 
     public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
     {
@@ -66,20 +75,52 @@ public class ConnectionImpl implements C
 
     public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
     {
-        Container container = clientId == null ? new Container() : new Container(clientId);
-        // TODO - authentication, containerId, clientId, ssl?, etc
-        try
+        _host = host;
+        _port = port;
+        _username = username;
+        _password = password;
+        _clientId = clientId;
+        _remoteHost = remoteHost;
+        _ssl = ssl;
+    }
+
+    private void connect() throws JMSException
+    {
+        synchronized(_lock)
         {
-            _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container, remoteHost, ssl);
-            // TODO - retrieve negotiated AMQP version
-            _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+            // already connected?
+            if( _state == State.UNCONNECTED )
+            {
+                _state = State.STOPPED;
+
+                Container container = _clientId == null ? new Container() : new Container(_clientId);
+                // TODO - authentication, containerId, clientId, ssl?, etc
+                try
+                {
+                    _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
+                            _port, _username, _password, container, _remoteHost, _ssl);
+                    // TODO - retrieve negotiated AMQP version
+                    _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+                }
+                catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+                {
+                    JMSException jmsEx = new JMSException(e.getMessage());
+                    jmsEx.setLinkedException(e);
+                    jmsEx.initCause(e);
+                    throw jmsEx;
+                }
+            }
         }
-        catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+    }
+
+    private void checkNotConnected(String msg) throws IllegalStateException
+    {
+        synchronized(_lock)
         {
-            JMSException jmsEx = new JMSException(e.getMessage());
-            jmsEx.setLinkedException(e);
-            jmsEx.initCause(e);
-            throw jmsEx;
+            if( _state != State.UNCONNECTED )
+            {
+                throw new IllegalStateException(msg);
+            }
         }
     }
 
@@ -111,7 +152,7 @@ public class ConnectionImpl implements C
             {
                 throw new IllegalStateException("Cannot create a session on a closed connection");
             }
-
+            connect();
             SessionImpl session = new SessionImpl(this, acknowledgeMode);
             session.setQueueSession(_isQueueConnection);
             session.setTopicSession(_isTopicConnection);
@@ -125,14 +166,19 @@ public class ConnectionImpl implements C
     public String getClientID() throws JMSException
     {
         checkClosed();
-        return _conn.getEndpoint().getContainer().getId();
+        return _clientId;
     }
 
-    public void setClientID(final String s) throws JMSException
+    public void setClientID(final String value) throws JMSException
     {
-        throw new IllegalStateException("Cannot set client-id to \""
-                                        + s
-                                        + "\"; client-id must be set on connection creation");
+        checkNotConnected("Cannot set client-id to \""
+                                        + value
+                                        + "\"; client-id must be set before the connection is used");
+        if( _clientId !=null )
+        {
+            throw new IllegalStateException("client-id has already been set");
+        }
+        _clientId = value;
     }
 
     public ConnectionMetaData getMetaData() throws JMSException
@@ -158,6 +204,7 @@ public class ConnectionImpl implements C
         synchronized(_lock)
         {
             checkClosed();
+            connect();
             if(_state == State.STOPPED)
             {
                 // TODO
@@ -187,6 +234,7 @@ public class ConnectionImpl implements C
                     {
                         session.stop();
                     }
+                case UNCONNECTED:
                     _state = State.STOPPED;
                     break;
                 case CLOSED:
@@ -235,7 +283,9 @@ public class ConnectionImpl implements C
                 {
                     task.onClose();
                 }
-                _conn.close();
+                if(_conn != null && _state != State.UNCONNECTED ) {
+                    _conn.close();
+                }
                 _state = State.CLOSED;
             }
 
@@ -282,6 +332,10 @@ public class ConnectionImpl implements C
                                                               final int i) throws JMSException
     {
         checkClosed();
+        if (_isQueueConnection)
+        {
+            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
+        } 
         return null;  //TODO
     }
 
@@ -326,4 +380,78 @@ public class ConnectionImpl implements C
     {
         _isTopicConnection = topicConnection;
     }
+
+    public String getTopicPrefix()
+    {
+        return _topicPrefix;
+    }
+
+    public void setTopicPrefix(String topicPrefix)
+    {
+        _topicPrefix = topicPrefix;
+    }
+
+    public String getQueuePrefix()
+    {
+        return _queuePrefix;
+    }
+
+    public void setQueuePrefix(String queueprefix)
+    {
+        _queuePrefix = queueprefix;
+    }
+
+    DecodedDestination toDecodedDestination(DestinationImpl dest)
+    {
+        String address = dest.getAddress();
+        Set<String> kind = null;
+        Class clazz = dest.getClass();
+        if( clazz==QueueImpl.class )
+        {
+            kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+            if( _queuePrefix!=null )
+            {
+                // Avoid double prefixing..
+                if( !address.startsWith(_queuePrefix) )
+                {
+                    address = _queuePrefix+address;
+                }
+            }
+        }
+        else if( clazz==TopicImpl.class )
+        {
+            kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+            if( _topicPrefix!=null )
+            {
+                // Avoid double prefixing..
+                if( !address.startsWith(_topicPrefix) )
+                {
+                    address = _topicPrefix+address;
+                }
+            }
+        }
+        else if( clazz==TemporaryQueueImpl.class )
+        {
+            kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+        }
+        else if( clazz==TemporaryTopicImpl.class )
+        {
+            kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+        }
+        return new DecodedDestination(address, kind);
+    }
+
+    DecodedDestination toDecodedDestination(String address, Set<String> kind)
+    {
+        if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+        {
+            return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+        }
+        if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+        {
+            return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+        }
+        return new DecodedDestination(address, kind);
+    }
+
 }

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Thu Feb 28 16:14:30 2013
@@ -127,7 +127,7 @@ public class MessageConsumerImpl impleme
     {
         try
         {
-            return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+            return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
                                                                _linkName, _durable, getFilters(), null);
         }
         catch (AmqpErrorException e)
@@ -316,9 +316,9 @@ public class MessageConsumerImpl impleme
         _lastUnackedMessage = deliveryTag;
     }
 
-    void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException
+    void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
     {
-        final int acknowledgeMode = _session.getAcknowledgeMode();
+        int acknowledgeMode = _session.getAckModeEnum().ordinal();
 
         if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
            || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Thu Feb 28 16:14:30 2013
@@ -50,14 +50,24 @@ public abstract class MessageImpl implem
     static final Set<Class> _supportedClasses =
                 new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
                                                  Float.class, Double.class, Character.class, String.class, byte[].class));
-    private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+    static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+    static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type");
+    static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type");
+
+    static final String QUEUE_ATTRIBUTE = "queue";
+    static final String TOPIC_ATTRIBUTE = "topic";
+    static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+    static final Set<String> JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE);
+    static final Set<String> JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE);
+    static final Set<String> JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
+    static final Set<String> JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
 
     private Header _header;
     private Properties _properties;
     private ApplicationProperties _applicationProperties;
     private Footer _footer;
-    public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-    private SessionImpl _sessionImpl;
+    private final SessionImpl _sessionImpl;
     private boolean _readOnly;
     private MessageAnnotations _messageAnnotations;
 
@@ -171,45 +181,53 @@ public abstract class MessageImpl implem
 
     public DestinationImpl getJMSReplyTo() throws JMSException
     {
-        return DestinationImpl.valueOf(getReplyTo());
+        return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
     }
 
     public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
     {
-        if(destination == null)
+        if( destination==null )
         {
             setReplyTo(null);
-        }
-        else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
-        {
-            setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+            messageAnnotationMap().remove(REPLY_TO_TYPE);
         }
         else
         {
-            throw new NonAMQPDestinationException(destination);
+            DecodedDestination dd = toDecodedDestination(destination);
+            setReplyTo(dd.getAddress());
+            messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
         }
     }
 
     public DestinationImpl getJMSDestination() throws JMSException
     {
-        return _isFromQueue ? QueueImpl.valueOf(getTo())
-                            : _isFromTopic ? TopicImpl.valueOf(getTo())
-                                           : DestinationImpl.valueOf(getTo());
+        Set<String> type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE));
+        if( type==null )
+        {
+            if( _isFromQueue )
+            {
+                type = JMS_QUEUE_ATTRIBUTES;
+            }
+            else if( _isFromTopic )
+            {
+                type = JMS_TOPIC_ATTRIBUTES;
+            }
+        }
+        return toDestination(getTo(), type);
     }
 
     public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
     {
-        if(destination == null)
+        if( destination==null )
         {
             setTo(null);
-        }
-        else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
-        {
-            setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+            messageAnnotationMap().remove(TO_TYPE);
         }
         else
         {
-            throw new NonAMQPDestinationException(destination);
+            DecodedDestination dd = toDecodedDestination(destination);
+            setTo(dd.getAddress());
+            messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes()));
         }
     }
 
@@ -264,22 +282,13 @@ public abstract class MessageImpl implem
 
     public String getJMSType() throws JMSException
     {
-        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
-        final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
-
+        final Object attrValue = getMessageAnnotation(JMS_TYPE);
         return attrValue instanceof String ? attrValue.toString() : null;
     }
 
     public void setJMSType(String s) throws JMSException
     {
-        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
-        if(messageAttrs == null)
-        {
-            messageAttrs = new HashMap();
-            _messageAnnotations = new MessageAnnotations(messageAttrs);
-        }
-
-        messageAttrs.put(JMS_TYPE, s);
+        messageAnnotationMap().put(JMS_TYPE, s);
     }
 
     public long getJMSExpiration() throws JMSException
@@ -1206,4 +1215,118 @@ public abstract class MessageImpl implem
     }
 
     abstract Collection<Section> getSections();
+
+    DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException
+    {
+        if(destination == null)
+        {
+            return null;
+        }
+        if (destination instanceof DestinationImpl)
+        {
+            return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination);
+        }
+        throw new NonAMQPDestinationException(destination);
+    }
+
+    DestinationImpl toDestination(String address, Set<String> kind)
+    {
+        if( address == null )
+        {
+            return null;
+        }
+
+        // If destination prefixes are in play, we have to strip the the prefix, and we might
+        // be able to infer the kind, if we don't know it yet.
+        DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind);
+        address = decoded.getAddress();
+        kind = decoded.getAttributes();
+
+        if( kind == null )
+        {
+            return DestinationImpl.valueOf(address);
+        }
+        if( kind.contains(QUEUE_ATTRIBUTE) )
+        {
+            if( kind.contains(TEMPORARY_ATTRIBUTE) )
+            {
+                return new TemporaryQueueImpl(address, null, _sessionImpl);
+            }
+            else
+            {
+                return QueueImpl.valueOf(address);
+            }
+        }
+        else if ( kind.contains(TOPIC_ATTRIBUTE) )
+        {
+            if( kind.contains(TEMPORARY_ATTRIBUTE) )
+            {
+                return new TemporaryTopicImpl(address, null, _sessionImpl);
+            }
+            else
+            {
+                return TopicImpl.valueOf(address);
+            }
+        }
+
+        return DestinationImpl.valueOf(address);
+    }
+
+    private Object getMessageAnnotation(Symbol key)
+    {
+        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+        return messageAttrs == null ? null : messageAttrs.get(key);
+    }
+
+    private Map messageAnnotationMap()
+    {
+        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+        if(messageAttrs == null)
+        {
+            messageAttrs = new HashMap();
+            _messageAnnotations = new MessageAnnotations(messageAttrs);
+        }
+        return messageAttrs;
+    }
+
+    Set<String> splitCommaSeparateSet(String value)
+    {
+        if( value == null )
+        {
+            return null;
+        }
+        HashSet<String> rc = new HashSet<String>();
+        for( String x: value.split("\\s*,\\s*") )
+        {
+            rc.add(x);
+        }
+        return rc;
+    }
+
+    private static Set<String> set(String ...args)
+    {
+        HashSet<String> s = new HashSet<String>();
+        for (String arg : args)
+        {
+            s.add(arg);
+        }
+        return Collections.unmodifiableSet(s);
+    }
+
+    static final String join(String sep, Iterable items)
+    {
+        StringBuilder result = new StringBuilder();
+
+        for (Object o : items)
+        {
+            if (result.length() > 0)
+            {
+                result.append(sep);
+            }
+            result.append(o.toString());
+        }
+
+        return result.toString();
+    }
+
 }

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Thu Feb 28 16:14:30 2013
@@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.imp
 
 import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.QueueSender;
 import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
 import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
@@ -61,7 +60,7 @@ public class MessageProducerImpl impleme
         {
             try
             {
-                _sender = _session.getClientSession().createSender(_destination.getAddress());
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
             }
             catch (Sender.SenderCreationException e)
             {
@@ -297,7 +296,7 @@ public class MessageProducerImpl impleme
             try
             {
                 _destination = (DestinationImpl) destination;
-                _sender = _session.getClientSession().createSender(_destination.getAddress());
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
 
                 send(message, deliveryMode, priority, ttl);
 

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java Thu Feb 28 16:14:30 2013
@@ -40,7 +40,25 @@ public class ObjectMessageImpl extends M
 {
     static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
 
-    private Data _objectData;
+    static final Data NULL_OBJECT_DATA;
+    static 
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try
+        {
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(null);
+            oos.flush();
+            oos.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray()));
+    }
+
+    private Data _objectData = NULL_OBJECT_DATA;
 
     protected ObjectMessageImpl(Header header,
                                 MessageAnnotations messageAnnotations,

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java Thu Feb 28 16:14:30 2013
@@ -18,9 +18,7 @@
  */
 package org.apache.qpid.amqp_1_0.jms.impl;
 
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import java.util.*;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
@@ -29,6 +27,7 @@ import org.apache.qpid.amqp_1_0.client.R
 import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.messaging.Filter;
 import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
@@ -39,49 +38,27 @@ public class QueueBrowserImpl implements
     private static final String JMS_SELECTOR = "jms-selector";
     private QueueImpl _queue;
     private String _selector;
-    private Receiver _receiver;
-    private Message _nextElement;
-    private MessageEnumeration _enumeration;
+    private final SessionImpl _session;
+    private Map<Symbol, Filter> _filters;
+    private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
+    private boolean _closed;
 
     QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
     {
         _queue = queue;
         _selector = selector;
+        _session = session;
 
 
-        Map<Symbol, Filter> filters;
         if(selector == null || selector.trim().equals(""))
         {
-            filters = null;
+            _filters = null;
         }
         else
         {
-            filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
-        }
-
-
-        try
-        {
-            _receiver = session.getClientSession().createReceiver(queue.getAddress(),
-                                                                  StdDistMode.COPY,
-                                                                  AcknowledgeMode.AMO,null,
-                                                                  false,
-                                                                  filters, null);
-            _nextElement = _receiver.receive(0L);
-            _enumeration = new MessageEnumeration();
-        }
-        catch(AmqpErrorException e)
-        {
-            org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
-            if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
-            {
-                throw new InvalidSelectorException(e.getMessage());
-            }
-            else
-            {
-                throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
-            }
-
+            _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+            // We do this just to have the server validate the filter..
+            new MessageEnumeration().close();
         }
     }
 
@@ -97,42 +74,124 @@ public class QueueBrowserImpl implements
 
     public Enumeration getEnumeration() throws JMSException
     {
-        if(_enumeration == null)
+        if(_closed)
         {
             throw new IllegalStateException("Browser has been closed");
         }
-        return _enumeration;
+        return new MessageEnumeration();
     }
 
     public void close() throws JMSException
     {
-        _receiver.close();
-        _enumeration = null;
+        _closed = true;
+        for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
+        {
+            me.close();
+        }
     }
 
-    private final class MessageEnumeration implements Enumeration<Message>
+    private final class MessageEnumeration implements Enumeration<MessageImpl>
     {
+        private Receiver _receiver;
+        private MessageImpl _nextElement;
+        private boolean _needNext = true;
+
+        MessageEnumeration() throws JMSException
+        {
+            try
+            {
+                _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
+                        StdDistMode.COPY,
+                        AcknowledgeMode.AMO, null,
+                        false,
+                        _filters, null);
+                _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+            }
+            catch(AmqpErrorException e)
+            {
+                org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
+                if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+                {
+                    throw new InvalidSelectorException(e.getMessage());
+                }
+                else
+                {
+                    throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+                }
+
+            }
+            _enumerations.add(this);
+
+        }
+
+        public void close()
+        {
+            _enumerations.remove(this);
+            _receiver.close();
+            _receiver = null;
+        }
 
         @Override
         public boolean hasMoreElements()
         {
+            if( _receiver == null )
+            {
+                return false;
+            }
+            if( _needNext )
+            {
+                _needNext = false;
+                _nextElement = createJMSMessage(_receiver.receive(0L));
+                if( _nextElement == null )
+                {
+                    // Drain to verify there really are no more messages.
+                    _receiver.drain();
+                    _receiver.drainWait();
+                    _nextElement = createJMSMessage(_receiver.receive(0L));
+                    if( _nextElement == null )
+                    {
+                        close();
+                    }
+                    else
+                    {
+                        // there are still more messages, open up the credit window again..
+                        _receiver.clearDrain();
+                    }
+                }
+            }
             return _nextElement != null;
         }
 
         @Override
-        public Message nextElement()
+        public MessageImpl nextElement()
         {
-
-            Message message = _nextElement;
-            if(message == null)
+            if( hasMoreElements() )
             {
-                message = _receiver.receive(0l);
+                MessageImpl message = _nextElement;
+                _nextElement = null;
+                _needNext = true;
+                return message;
             }
-            if(message != null)
+            else
             {
-                _nextElement = _receiver.receive(0l);
+                throw new NoSuchElementException();
             }
+        }
+    }
+
+    MessageImpl createJMSMessage(final Message msg)
+    {
+        if(msg != null)
+        {
+            final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
+            message.setFromQueue(true);
+            message.setFromTopic(false);
             return message;
         }
+        else
+        {
+            return null;
+        }
     }
+
 }

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Thu Feb 28 16:14:30 2013
@@ -41,7 +41,7 @@ public class QueueReceiverImpl extends M
     {
         try
         {
-            return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+            return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
         }
         catch (AmqpErrorException e)
         {

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Thu Feb 28 16:14:30 2013
@@ -766,7 +766,7 @@ public class SessionImpl implements Sess
             {
                 while(!_closed)
                 {
-                    while(!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))
+                    while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
                     {
                         try
                         {
@@ -777,7 +777,7 @@ public class SessionImpl implements Sess
                             return;
                         }
                     }
-                    while(_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))
+                    while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
                     {
                         Message msg;
 
@@ -804,6 +804,10 @@ public class SessionImpl implements Sess
 
                         if(message != null)
                         {
+                            if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+                            {
+                                consumer.setLastUnackedMessage(msg.getDeliveryTag());
+                            }
                             _currentConsumer = consumer;
                             _currentMessage = msg;
                             try
@@ -816,11 +820,11 @@ public class SessionImpl implements Sess
                                 _currentMessage = null;
                             }
 
-                            if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
-                               || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE))
+                            if(_recoveredMessage == null)
                             {
-                                consumer.acknowledge(msg);
+                                consumer.preReceiveAction(msg);
                             }
+
                         }
 
                     }
@@ -895,4 +899,10 @@ public class SessionImpl implements Sess
     {
         _isTopicSession = topicSession;
     }
+
+    String toAddress(DestinationImpl dest)
+    {
+        return _connection.toDecodedDestination(dest).getAddress();
+    }
+
 }

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java Thu Feb 28 16:14:30 2013
@@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends
     {
         try
         {
-            String address = getDestination().getAddress();
+            String address = getSession().toAddress(getDestination());
             Receiver receiver = getSession().getClientSession().createReceiver(address,
                                                                                StdDistMode.COPY, AcknowledgeMode.ALO,
                                                                                getLinkName(), isDurable(), getFilters(),

Modified: qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java Thu Feb 28 16:14:30 2013
@@ -57,6 +57,9 @@ public class PropertiesFileInitialContex
         Map data = new ConcurrentHashMap();
 
         String file = null;
+        String fileName = (environment.containsKey(Context.PROVIDER_URL))
+                            ? (String)environment.get(Context.PROVIDER_URL) : System.getProperty(Context.PROVIDER_URL);
+
         try
         {
 

Modified: qpid/branches/asyncstore/java/amqp-1-0-client/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client/build.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client/build.xml (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client/build.xml Thu Feb 28 16:14:30 2013
@@ -23,6 +23,9 @@
   <property name="module.genpom" value="true"/>
   <property name="module.depends" value="amqp-1-0-common"/>
 
+  <property name="example.src.dir" value="${project.root}/amqp-1-0-client/example/src/main/java" />
+  <property name="example.jar.file" value="${build.lib}/qpid-amqp-1-0-client-example-${project.version}.jar" />
+
 
   <import file="../module.xml"/>
 

Modified: qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Thu Feb 28 16:14:30 2013
@@ -20,6 +20,15 @@
  */
 package org.apache.qpid.amqp_1_0.client;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
 import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
 import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.Fra
 import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 public class Connection
 {
     private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
@@ -224,7 +222,6 @@ public class Connection
             }
 
 
-            //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry());
             ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
             Thread outputThread = new Thread(outputHandler);
             outputThread.setDaemon(true);
@@ -236,8 +233,6 @@ public class Connection
             final ConnectionHandler handler = new ConnectionHandler(_conn);
             final InputStream inputStream = s.getInputStream();
 
-            //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn));
-
             Thread inputThread = new Thread(new Runnable()
             {
 
@@ -246,7 +241,6 @@ public class Connection
                     try
                     {
                         doRead(handler, inputStream);
-//                        doRead(transport, inputStream);
                     }
                     finally
                     {
@@ -268,85 +262,6 @@ public class Connection
             inputThread.setDaemon(true);
             inputThread.start();
 
-/*
-            Thread outputThread = new Thread(new Runnable()
-            {
-
-                private int _lastWrite;
-
-                public void run()
-                {
-                    try
-                    {
-//                        doRead(handler, inputStream);
-                        final Object lock = new Object();
-                        transport.setOutputStateChangeListener(new StateChangeListener()
-                        {
-
-                            public void onStateChange(final boolean active)
-                            {
-                                synchronized (lock)
-                                {
-                                    lock.notifyAll();
-                                }
-                            }
-                        });
-
-                        synchronized(lock)
-                        {
-                            while(transport.isOpenForOutput())
-                            {
-                                _lastWrite = 0;
-                                transport.getNextBytes(new BytesProcessor()
-                                {
-
-                                    public void processBytes(final ByteBuffer buf)
-                                    {
-                                        _lastWrite = buf.remaining();
-                                        try
-                                        {
-                                            outputStream.write(buf.array(),
-                                                               buf.arrayOffset() + buf.position(),
-                                                               buf.limit() - buf.position());
-                                        }
-                                        catch (IOException e)
-                                        {
-                                            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                                        }
-                                    }
-                                });
-                                if(_lastWrite == 0 && transport.isOpenForOutput())
-                                {
-                                    try
-                                    {
-                                        lock.wait(1000);
-                                    }
-                                    catch (InterruptedException e)
-                                    {
-                                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                                    }
-                                }
-                            }
-                        }
-                    }
-                    finally
-                    {
-                        if(_conn.closedForInput() && _conn.closedForOutput())
-                        {
-                            try
-                            {
-                                s.close();
-                            }
-                            catch (IOException e)
-                            {
-                                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                            }
-                        }
-                    }
-                }
-            });
-*/
-
             _conn.open();
 
         }
@@ -394,7 +309,7 @@ public class Connection
         }
         catch (IOException e)
         {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            e.printStackTrace();
         }
 
     }
@@ -419,7 +334,7 @@ public class Connection
         {
             int read;
             boolean done = false;
-            while(!done && (read = inputStream.read(buf)) != -1)
+            while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
             {
                 ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
                 Binary b = new Binary(buf,0,read);
@@ -428,12 +343,6 @@ public class Connection
                 {
                     RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
                 }
-                /*System.err.println(b);
-                System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone());
-                if(handler.isDone())
-                {
-                    System.err.println(handler.getClass().getName() + "IS DONE!");
-                } */
                 while(bbuf.hasRemaining() && !handler.isDone())
                 {
                     handler.parse(bbuf);
@@ -444,7 +353,7 @@ public class Connection
         }
         catch (IOException e)
         {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            e.printStackTrace();
         }
     }
 

Modified: qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Thu Feb 28 16:14:30 2013
@@ -241,7 +241,7 @@ public class Receiver implements Deliver
                     }
                     if(hasMore)
                     {
-                        xfr = receiveFromPrefetch(0L);
+                        xfr = receiveFromPrefetch(-1l);
                         if(xfr== null)
                         {
                             // TODO - this is wrong!!!!
@@ -503,6 +503,37 @@ public class Receiver implements Deliver
         _endpoint.drain();
     }
 
+    /**
+     * Waits for the receiver to drain or a message to be available to be received.
+     * @return true if the receiver has been drained.
+     */
+    public boolean drainWait()
+    {
+        final Object lock = _endpoint.getLock();
+        synchronized(lock)
+        {
+            try
+            {
+                while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+                {
+                    lock.wait();
+                }
+            }
+            catch (InterruptedException e)
+            {
+            }
+        }
+        return _prefetchQueue.peek()==null && _endpoint.isDrained();
+    }
+
+    /**
+     * Clears the receiver drain so that message delivery can resume.
+     */
+    public void clearDrain()
+    {
+        _endpoint.clearDrain();
+    }
+
     public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
     {
         _endpoint.setLinkCredit(credit);
@@ -558,4 +589,4 @@ public class Receiver implements Deliver
         void messageArrived(Receiver receiver);
     }
 
-}
\ No newline at end of file
+}

Propchange: qpid/branches/asyncstore/java/amqp-1-0-common/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1375509-1450773

Modified: qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java Thu Feb 28 16:14:30 2013
@@ -77,13 +77,21 @@ public class FrameWriter implements Valu
             {
                 case SIZE_0:
 
-                    _typeWriter.setValue(_frame.getFrameBody());
-
                     int payloadLength = _payload == null ? 0 : _payload.remaining();
 
-                    _size = _typeWriter.writeToBuffer(remaining > 8
-                                                      ? (ByteBuffer)buffer.duplicate().position(buffer.position()+8)
-                                                      : ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+                    if(_typeWriter!=null)
+                    {
+                        _typeWriter.setValue(_frame.getFrameBody());
+
+
+                        _size = _typeWriter.writeToBuffer(remaining > 8
+                                                          ? (ByteBuffer)buffer.duplicate().position(buffer.position()+8)
+                                                          : ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+                    }
+                    else
+                    {
+                        _size = 8 + payloadLength;
+                    }
                     if(remaining >= 4)
                     {
                         buffer.putInt(_size);
@@ -239,7 +247,14 @@ public class FrameWriter implements Valu
         _size = -1;
         _payload = null;
         final Object frameBody = frame.getFrameBody();
-        _typeWriter = _registry.getValueWriter(frameBody);
+        if(frameBody!=null)
+        {
+            _typeWriter = _registry.getValueWriter(frameBody);
+        }
+        else
+        {
+            _typeWriter = null;
+        }
         _payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
     }
 }

Modified: qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java Thu Feb 28 16:14:30 2013
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.amqp_1_0.framing;
 
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.FrameBody;
 
 import java.nio.ByteBuffer;
@@ -65,4 +66,11 @@ public abstract class AMQFrame<T>
         return _frameBody;
     }
 
+    @Override
+    public String toString()
+    {
+        return "AMQFrame{" +
+               "frameBody=" + _frameBody +
+               '}';
+    }
 }

Modified: qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Thu Feb 28 16:14:30 2013
@@ -103,6 +103,7 @@ public class ConnectionHandler
 
         private boolean _setForClose;
         private boolean _closed;
+        private long _nextHeartbeat;
 
         public FrameOutput(final ConnectionEndpoint conn)
         {
@@ -165,14 +166,34 @@ public class ConnectionHandler
         {
             synchronized(_conn.getLock())
             {
+                long time = System.currentTimeMillis();
                 try
                 {
                     AMQFrame frame = null;
                     while(!closed() && (frame = _queue.poll()) == null && wait)
                     {
-                        _conn.getLock().wait();
+                        _conn.getLock().wait(_conn.getIdleTimeout()/2);
+
+                        if(_conn.getIdleTimeout()>0)
+                        {
+                            time = System.currentTimeMillis();
+
+                            if(frame == null && time > _nextHeartbeat)
+                            {
+                                frame = new TransportFrame((short) 0,null);
+                                break;
+                            }
+                        }
                     }
 
+
+
+
+                    if(frame != null)
+                    {
+                        _nextHeartbeat = time + _conn.getIdleTimeout()/2;
+
+                    }
                     if(frame == _endOfFrameMarker)
                     {
                         _closed = true;

Modified: qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Thu Feb 28 16:14:30 2013
@@ -81,6 +81,8 @@ public class ConnectionEndpoint implemen
     private boolean _closedForInput;
     private boolean _closedForOutput;
 
+    private long _idleTimeout;
+
     private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
                 .registerTransportLayer()
                 .registerMessagingLayer()
@@ -282,6 +284,11 @@ public class ConnectionEndpoint implemen
 
         _remoteContainerId = open.getContainerId();
 
+        if(open.getIdleTimeOut() != null)
+        {
+            _idleTimeout = open.getIdleTimeOut().longValue();
+        }
+
         switch(_state)
         {
             case UNOPENED:
@@ -316,6 +323,7 @@ public class ConnectionEndpoint implemen
                 sendClose(new Close());
                 break;
             case CLOSE_SENT:
+
             default:
         }
     }
@@ -650,6 +658,11 @@ public class ConnectionEndpoint implemen
         return this;
     }
 
+    public synchronized long getIdleTimeout()
+    {
+        return _idleTimeout;
+    }
+
     public synchronized void close()
     {
         switch(_state)

Modified: qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java Thu Feb 28 16:14:30 2013
@@ -71,6 +71,10 @@ public class Delivery
         {
             setComplete(true);
         }
+        if(Boolean.TRUE.equals(transfer.getSettled()))
+        {
+            setSettled(true);
+        }
     }
 
     public List<Transfer> getTransfers()

Modified: qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (original)
+++ qpid/branches/asyncstore/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java Thu Feb 28 16:14:30 2013
@@ -113,33 +113,37 @@ public class ReceivingLinkEndpoint exten
         synchronized (getLock())
         {
             TransientState transientState;
-            boolean existingState = _unsettledMap.containsKey(transfer.getDeliveryTag());
-            _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState());
+            final Binary deliveryTag = delivery.getDeliveryTag();
+            boolean existingState = _unsettledMap.containsKey(deliveryTag);
+            if(!existingState || transfer.getState() != null)
+            {
+                _unsettledMap.put(deliveryTag, transfer.getState());
+            }
             if(!existingState)
             {
                 transientState = new TransientState(transfer.getDeliveryId());
-                if(Boolean.TRUE.equals(transfer.getSettled()))
+                if(delivery.isSettled())
                 {
                     transientState.setSettled(true);
                 }
-                _unsettledIds.put(transfer.getDeliveryTag(), transientState);
+                _unsettledIds.put(deliveryTag, transientState);
                 setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
                 setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
 
             }
             else
             {
-                transientState = _unsettledIds.get(transfer.getDeliveryTag());
+                transientState = _unsettledIds.get(deliveryTag);
                 transientState.incrementCredit();
-                if(Boolean.TRUE.equals(transfer.getSettled()))
+                if(delivery.isSettled())
                 {
                     transientState.setSettled(true);
                 }
             }
 
-            if(transientState.isSettled())
+            if(transientState.isSettled() && delivery.isComplete())
             {
-                _unsettledMap.remove(transfer.getDeliveryTag());
+                _unsettledMap.remove(deliveryTag);
             }
             getLinkEventListener().messageTransfer(transfer);
 
@@ -155,7 +159,7 @@ public class ReceivingLinkEndpoint exten
             super.receiveFlow(flow);
             _remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain());
             setAvailable(flow.getAvailable());
-            _remoteTransferCount = flow.getDeliveryCount();
+            setDeliveryCount(flow.getDeliveryCount());
             getLock().notifyAll();
         }
     }
@@ -371,7 +375,7 @@ public class ReceivingLinkEndpoint exten
                     tag = iter.next();
                     tagsToUpdate.add(tag);
 
-                    deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+                    deliveryId = _unsettledIds.get(tag).getDeliveryId();
 
                     if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
                     {

Modified: qpid/branches/asyncstore/java/bdbstore/bin/backup.sh
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/bin/backup.sh?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/bin/backup.sh (original)
+++ qpid/branches/asyncstore/java/bdbstore/bin/backup.sh Thu Feb 28 16:14:30 2013
@@ -34,11 +34,8 @@ if [ -z "${QPID_HOME}" ]; then
     export QPID_HOME=`cd ${WHEREAMI}/../ && pwd`
 fi
 
-VERSION=0.19
-
 # BDB's je JAR expected to be found in lib/opt
-LIBS="${QPID_HOME}/lib/opt/*:${QPID_HOME}/lib/qpid-bdbstore-${VERSION}.jar:${QPID_HOME}/lib/qpid-all.jar"
-
+LIBS="${QPID_HOME}/lib/opt/*:${QPID_HOME}/lib/qpid-all.jar"
 
 echo "Starting Hot Backup Script"
 java -Dlog4j.configuration=backup-log4j.xml ${JAVA_OPTS} -cp "${LIBS}" org.apache.qpid.server.store.berkeleydb.BDBBackup "${ARGS[@]}"

Modified: qpid/branches/asyncstore/java/bdbstore/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/build.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/build.xml (original)
+++ qpid/branches/asyncstore/java/bdbstore/build.xml Thu Feb 28 16:14:30 2013
@@ -18,7 +18,7 @@
  -->
 <project name="bdbstore" xmlns:ivy="antlib:org.apache.ivy.ant" default="build">
     <property name="module.depends" value="common broker" />
-    <property name="module.test.depends" value="test client common/test broker/test management/common systests" />
+    <property name="module.test.depends" value="client common/tests broker/tests management/common systests broker-plugins/management-jmx" />
     <property name="module.genpom" value="true"/>
 
     <import file="../module.xml" />
@@ -78,19 +78,4 @@ http://www.oracle.com/technetwork/databa
 
     <target name="build" depends="check-request-props, bdb-jar-required, module.build" />
 
-    <target name="postbuild" depends="copy-store-to-upgrade" />
-
-    <target name="copy-store-to-upgrade" description="copy the upgrade tool resource folder contents into the build tree">
-      <copy todir="${qpid.home}" failonerror="true">
-        <fileset dir="src/test/resources/upgrade"/>
-      </copy>
-    </target>
-
-    <target name="precompile-tests">
-        <mkdir dir="${module.test.resources}"/>
-        <copy todir="${module.test.resources}">
-            <fileset dir="src/test/resources"/>
-        </copy>
-    </target>
-
 </project>

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/build.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/build.xml (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/build.xml Thu Feb 28 16:14:30 2013
@@ -18,13 +18,13 @@
  -->
 <project name="bdbstore-jmx" default="build">
     <property name="module.depends" value="common broker broker-plugins/management-jmx management/common bdbstore" />
-    <property name="module.test.depends" value="test broker/test common/test management/common client systests bdbstore/test" />
+    <property name="module.test.depends" value="broker/tests common/tests management/common client systests bdbstore/tests" />
 
-    <property name="module.manifest" value="MANIFEST.MF" />
-    <property name="module.plugin" value="true" />
     <property name="module.genpom" value="true"/>
     <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker=provided -Sqpid-broker-plugins-management-jmx=provided -Sqpid-management-common=provided -Sqpid-bdbstore=provided -Sje=provided"/>
 
+    <property name="broker.plugin" value="true"/>
+
     <import file="../../module.xml" />
 
     <target name="bundle" depends="bundle-tasks" />

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java Thu Feb 28 16:14:30 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import javax.management.JMException;
+import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.CompositeType;
@@ -91,7 +92,7 @@ public class BDBHAMessageStoreManagerMBe
     @Override
     public String getObjectInstanceName()
     {
-        return _store.getName();
+        return ObjectName.quote(_store.getName());
     }
 
     @Override

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java Thu Feb 28 16:14:30 2013
@@ -28,13 +28,11 @@ import org.apache.qpid.server.jmx.MBeanP
 import org.apache.qpid.server.jmx.ManagedObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 /**
  * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
- * host and of type {@link BDBHAMessageStore#BDB_HA_STORE_TYPE}.
+ * host and of type {@link BDBHAMessageStore#TYPE}.
  *
  */
 public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@@ -50,7 +48,7 @@ public class BDBHAMessageStoreManagerMBe
     public boolean isChildManageableByMBean(ConfiguredObject child)
     {
         return (child instanceof VirtualHost
-            && BDBHAMessageStore.BDB_HA_STORE_TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+            && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
     }
 
     @Override
@@ -58,10 +56,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         VirtualHost virtualHostChild = (VirtualHost) child;
 
-        VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
-        org.apache.qpid.server.virtualhost.VirtualHost vhost = virtualHostRegistry.getVirtualHost(virtualHostChild.getName());
-
-        BDBHAMessageStore messageStore = (BDBHAMessageStore) vhost.getMessageStore();
+        BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore();
 
         if (LOGGER.isDebugEnabled())
         {

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider Thu Feb 28 16:14:30 2013
@@ -1 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
 org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java Thu Feb 28 16:14:30 2013
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.Set;
 
 import javax.jms.Connection;
+import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
 
@@ -37,7 +38,6 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
 import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
-import org.apache.qpid.server.virtualhost.ManagedVirtualHost;
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
@@ -55,7 +55,7 @@ public class HAClusterManagementTest ext
     private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
     private static final String VIRTUAL_HOST = "test";
 
-    private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+    private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
     private static final int NUMBER_OF_NODES = 4;
 
     private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
@@ -67,7 +67,6 @@ public class HAClusterManagementTest ext
     protected void setUp() throws Exception
     {
         _brokerType = BrokerType.SPAWNED;
-        _jmxUtils.setUp();
 
         _clusterCreator.configureClusterNodes();
         _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
@@ -132,12 +131,11 @@ public class HAClusterManagementTest ext
         final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
 
         ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+        awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
+
         final TabularData groupMembers = storeBean.getAllNodesInGroup();
         assertNotNull(groupMembers);
 
-        final int numberOfDataRows = groupMembers.size();
-        assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows);
-
         for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
         {
             final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
@@ -155,8 +153,7 @@ public class HAClusterManagementTest ext
         final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
         final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
         final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
-        final int numberOfDataRows = storeBean.getAllNodesInGroup().size();
-        assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows);
+        awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
 
         final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
         _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
@@ -266,4 +263,27 @@ public class HAClusterManagementTest ext
 
         return _jmxUtils.getManagedBroker(VIRTUAL_HOST);
     }
+
+    private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception
+    {
+        long totalTimeWaited = 0l;
+        long waitInterval = 100l;
+        long maxWaitTime = 10000;
+
+        int currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
+        while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime)
+        {
+            LOGGER.debug("Still awaiting nodes to join group; expecting "
+                + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes
+                + " after " + totalTimeWaited + " ms.");
+
+            totalTimeWaited += waitInterval;
+            Thread.sleep(waitInterval);
+
+            currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
+        }
+
+        assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms",
+                expectedNumberOfNodes ,currentNumberOfNodes);
+    }
 }

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java Thu Feb 28 16:14:30 2013
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import javax.management.ObjectName;
 
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
@@ -41,7 +42,7 @@ public class HAClusterTwoNodeTest extend
 
     private static final String VIRTUAL_HOST = "test";
 
-    private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+    private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
     private static final int NUMBER_OF_NODES = 2;
 
     private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
@@ -56,7 +57,6 @@ public class HAClusterTwoNodeTest extend
 
         assertTrue(isJavaBroker());
         assertTrue(isBrokerStorePersistent());
-        _jmxUtils.setUp();
 
         super.setUp();
     }
@@ -86,11 +86,11 @@ public class HAClusterTwoNodeTest extend
 
         String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix();
 
-        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
-        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s");
+        setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+        setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s");
 
-        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
-        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0");
+        setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+        setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0");
 
         _clusterCreator.configureClusterNodes();
         _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);

Modified: qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java (original)
+++ qpid/branches/asyncstore/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java Thu Feb 28 16:14:30 2013
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import javax.management.JMException;
+import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.TabularData;
@@ -84,7 +85,7 @@ public class BDBHAMessageStoreManagerMBe
     {
         when(_store.getName()).thenReturn(TEST_STORE_NAME);
 
-        String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME;
+        String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
         assertEquals(expectedObjectName, _mBean.getObjectName().toString());
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org