You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/14 15:46:38 UTC

svn commit: r1558056 [2/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java: ./ amqp-1-0-client-jms/ amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client-websocket/ amqp-1-0-client-websocket/resources/ amqp-1-0-client/ a...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java Tue Jan 14 14:46:35 2014
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.codec.De
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
 import org.apache.qpid.amqp_1_0.type.*;
@@ -39,10 +40,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 public class Sender implements DeliveryStateHandler
 {
+    private static final long UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER = 1000l;
+    private static final long DEFAULT_CREDIT_TIMEOUT = 30000l;
+
     private SendingLinkEndpoint _endpoint;
     private int _id;
     private Session _session;
@@ -150,17 +156,26 @@ public class Sender implements DeliveryS
 
         synchronized(_endpoint.getLock())
         {
-            while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+            try
             {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new SenderCreationException(e);
-                }
+                _endpoint.waitUntil(new Predicate()
+                                    {
+                                        @Override
+                                        public boolean isSatisfied()
+                                        {
+                                            return _endpoint.isAttached() || _endpoint.isDetached();
+                                        }
+                                    });
+            }
+            catch (TimeoutException e)
+            {
+                throw new SenderCreationException(e);
             }
+            catch (InterruptedException e)
+            {
+                throw new SenderCreationException(e);
+            }
+
             if (session.getEndpoint().isEnded())
             {
                 throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress());
@@ -225,22 +240,22 @@ public class Sender implements DeliveryS
         return _endpoint.getTarget();
     }
 
-    public void send(Message message) throws LinkDetachedException
+    public void send(Message message) throws LinkDetachedException, TimeoutException
     {
         send(message, null, null);
     }
 
-    public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+    public void send(Message message, final OutcomeAction action) throws LinkDetachedException, TimeoutException
     {
         send(message, null, action);
     }
 
-    public void send(Message message, final Transaction txn) throws LinkDetachedException
+    public void send(Message message, final Transaction txn) throws LinkDetachedException, TimeoutException
     {
         send(message, txn, null);
     }
 
-    public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+    public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException, TimeoutException
     {
 
         List<Section> sections = message.getPayload();
@@ -290,19 +305,26 @@ public class Sender implements DeliveryS
             xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
         }
         final Object lock = _endpoint.getLock();
+
         synchronized(lock)
         {
-            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+
+            try
             {
-                try
-                {
-                    lock.wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
+                _endpoint.waitUntil(new Predicate()
+                                    {
+                                        @Override
+                                        public boolean isSatisfied()
+                                        {
+                                            return _endpoint.hasCreditToSend() || _endpoint.isDetached();
+                                        }
+                                    }, getCreditTimeout());
             }
+            catch (InterruptedException e)
+            {
+                throw new TimeoutException("Interrupted while waiting for credit");
+            }
+
             if(_endpoint.isDetached())
             {
                 throw new LinkDetachedException(_error);
@@ -312,27 +334,24 @@ public class Sender implements DeliveryS
                 _outcomeActions.put(message.getDeliveryTag(), action);
             }
             _endpoint.transfer(xfr);
-            //TODO - rationalise sending of flows
-            // _endpoint.sendFlow();
         }
 
         if(_windowSize != 0)
         {
-            synchronized(lock)
+            try
             {
-
-
-                while(_endpoint.getUnsettledCount() >= _windowSize)
-                {
-                    try
-                    {
-                        lock.wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
+                _endpoint.waitUntil(new Predicate()
+                                    {
+                                        @Override
+                                        public boolean isSatisfied()
+                                        {
+                                            return _endpoint.getUnsettledCount() < _windowSize;
+                                        }
+                                    }, getUnsettledTimeout());
+            }
+            catch (InterruptedException e)
+            {
+                throw new TimeoutException("Interrupted while waiting for the window to expand to allow sending");
             }
 
         }
@@ -340,48 +359,80 @@ public class Sender implements DeliveryS
 
     }
 
+    private long getCreditTimeout()
+    {
+        return _endpoint.getSyncTimeout() < DEFAULT_CREDIT_TIMEOUT ? DEFAULT_CREDIT_TIMEOUT : _endpoint.getSyncTimeout();
+    }
+
     public void close() throws SenderClosingException
     {
+        boolean unsettledDeliveries = false;
 
         if(_windowSize != 0)
         {
-            synchronized(_endpoint.getLock())
-            {
-
+            long timeout = getUnsettledTimeout();
 
-                while(_endpoint.getUnsettledCount() > 0)
+            try
+            {
+                _endpoint.waitUntil(new Predicate()
                 {
-                    try
-                    {
-                        _endpoint.getLock().wait();
-                    }
-                    catch (InterruptedException e)
+                    @Override
+                    public boolean isSatisfied()
                     {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                        return _endpoint.getUnsettledCount() == 0;
                     }
-                }
+                }, timeout);
+            }
+            catch (InterruptedException e)
+            {
+                unsettledDeliveries = true;
+            }
+            catch (TimeoutException e)
+            {
+                unsettledDeliveries = true;
             }
 
         }
         _session.removeSender(this);
         _endpoint.setSource(null);
-        _endpoint.detach();
+        _endpoint.close();
         _closed = true;
 
-        synchronized(_endpoint.getLock())
+        try
         {
-            while(!_endpoint.isDetached())
+            _endpoint.waitUntil(new Predicate()
             {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
+                @Override
+                public boolean isSatisfied()
                 {
-                    throw new SenderClosingException(e);
+                    return _endpoint.isDetached();
                 }
-            }
+            });
+        }
+        catch (TimeoutException e)
+        {
+            throw new SenderClosingException("Timed out attempting to detach link", e);
         }
+        catch (InterruptedException e)
+        {
+            throw new SenderClosingException("Interrupted while attempting to detach link", e);
+        }
+        if(unsettledDeliveries && _endpoint.getUnsettledCount() > 0)
+        {
+            throw new SenderClosingException("Some messages may not have been received by the recipient");
+        }
+    }
+
+    private long getUnsettledTimeout()
+    {
+        long timeout = _endpoint.getSyncTimeout();
+
+        // give a generous timeout where there are unsettled messages
+        if(timeout < _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER)
+        {
+            timeout = _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER;
+        }
+        return timeout;
     }
 
     public boolean isClosed()
@@ -468,10 +519,20 @@ public class Sender implements DeliveryS
 
     public class SenderClosingException extends Exception
     {
+        public SenderClosingException(final String message, final Throwable cause)
+        {
+            super(message, cause);
+        }
+
         public SenderClosingException(Throwable e)
         {
             super(e);
         }
+
+        public SenderClosingException(final String message)
+        {
+            super(message);
+        }
     }
 
     public static interface OutcomeAction

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1549895-1558036

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Tue Jan 14 14:46:35 2014
@@ -66,20 +66,7 @@ public class SymbolTypeConstructor exten
         if(symbolVal == null)
         {
             ByteBuffer dup = in.duplicate();
-            try
-            {
-                dup.limit(in.position()+size);
-            }
-            catch (IllegalArgumentException e)
-            {
-                System.err.println("in.position(): " + in.position());
-                System.err.println("size: " + size);
-                System.err.println("dup.position(): " + dup.position());
-                System.err.println("dup.capacity(): " + dup.capacity());
-                System.err.println("dup.limit(): " + dup.limit());
-                throw e;
-
-            }
+            dup.limit(in.position()+size);
             CharBuffer charBuf = ASCII.decode(dup);
 
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Tue Jan 14 14:46:35 2014
@@ -65,6 +65,12 @@ public class ConnectionHandler
     public boolean parse(ByteBuffer in)
     {
 
+        if(RAW_LOGGER.isLoggable(Level.FINE))
+        {
+            Binary b = new Binary(in.array(),in.arrayOffset()+in.position(),in.remaining());
+            RAW_LOGGER.fine("RECV [" + _connection.getRemoteAddress() + "] : " + b.toString());
+        }
+
         while(in.hasRemaining() && !isDone())
         {
             _delegate = _delegate.parse(in);
@@ -376,6 +382,47 @@ public class ConnectionHandler
     }
 
 
+    public static class SequentialFrameSource implements FrameSource
+    {
+        private Queue<FrameSource> _sources = new LinkedList<FrameSource>();
+
+        public SequentialFrameSource(FrameSource... sources)
+        {
+            _sources.addAll(Arrays.asList(sources));
+        }
+
+        public synchronized void addSource(FrameSource source)
+        {
+            _sources.add(source);
+        }
+
+        @Override
+        public synchronized AMQFrame getNextFrame(final boolean wait)
+        {
+            FrameSource src = _sources.peek();
+            while (src != null && src.closed())
+            {
+                _sources.poll();
+                src = _sources.peek();
+            }
+
+            if(src != null)
+            {
+                return src.getNextFrame(wait);
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public boolean closed()
+        {
+            return _sources.isEmpty();
+        }
+    }
+
+
     public static class BytesOutputHandler implements Runnable, BytesProcessor
     {
 
@@ -383,28 +430,28 @@ public class ConnectionHandler
         private BytesSource _bytesSource;
         private boolean _closed;
         private ConnectionEndpoint _conn;
-        private SocketExceptionHandler _exceptionHandler;
-
-        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
-            {
-                _outputStream = outputStream;
-                _bytesSource = source;
-                _conn = conn;
-                _exceptionHandler = exceptionHandler;
-            }
+        private ExceptionHandler _exceptionHandler;
 
-            public void run()
-            {
+        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, ExceptionHandler exceptionHandler)
+        {
+            _outputStream = outputStream;
+            _bytesSource = source;
+            _conn = conn;
+            _exceptionHandler = exceptionHandler;
+        }
 
-                final BytesSource bytesSource = _bytesSource;
+        public void run()
+        {
 
-                while(!(_closed || bytesSource.closed()))
-                {
-                    _bytesSource.getBytes(this, true);
-                }
+            final BytesSource bytesSource = _bytesSource;
 
+            while(!(_closed || bytesSource.closed()))
+            {
+                _bytesSource.getBytes(this, true);
             }
 
+        }
+
         public void processBytes(final ByteBuffer buf)
         {
             try
@@ -423,7 +470,7 @@ public class ConnectionHandler
             catch (IOException e)
             {
                 _closed = true;
-                _exceptionHandler.processSocketException(e);
+                _exceptionHandler.handleException(e);
             }
         }
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Tue Jan 14 14:46:35 2014
@@ -127,9 +127,9 @@ public class FrameHandler implements Pro
                         break;
                     }
 
-                    else if(size > _connection.getMaxFrameSize())
+                    else if(size > _connection.getDesiredMaxFrameSize().intValue())
                     {
-                        frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getMaxFrameSize());
+                        frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getDesiredMaxFrameSize().intValue());
                         state = State.ERROR;
                         break;
                     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Tue Jan 14 14:46:35 2014
@@ -40,10 +40,8 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
 
-import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslServerFactory;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -51,7 +49,7 @@ import java.nio.charset.Charset;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Enumeration;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -71,6 +69,7 @@ public class ConnectionEndpoint implemen
 
     private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
     private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
+    private static final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqp.connection_sync_timeout",5000l);
 
 
     private ConnectionState _state = ConnectionState.UNOPENED;
@@ -104,7 +103,7 @@ public class ConnectionEndpoint implemen
     private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
     private ConnectionEventListener _connectionEventListener = ConnectionEventListener.DEFAULT;
     private String _password;
-    private final boolean _requiresSASLClient;
+    private boolean _requiresSASLClient;
     private final boolean _requiresSASLServer;
 
 
@@ -122,6 +121,7 @@ public class ConnectionEndpoint implemen
     private Error _remoteError;
 
     private Map _properties;
+    private long _syncTimeout = DEFAULT_SYNC_TIMEOUT;
 
     public ConnectionEndpoint(Container container, SaslServerProvider cbs)
     {
@@ -140,6 +140,14 @@ public class ConnectionEndpoint implemen
         _requiresSASLServer = false;
     }
 
+    public void setPrincipal(Principal user)
+    {
+        if(_user == null)
+        {
+            _user = user;
+            _requiresSASLClient = user != null;
+        }
+    }
 
     public synchronized void open()
     {
@@ -1054,4 +1062,42 @@ public class ConnectionEndpoint implemen
     {
         _channelMax = channelMax;
     }
+
+    public long getSyncTimeout()
+    {
+        return _syncTimeout;
+    }
+
+    public void setSyncTimeout(final long syncTimeout)
+    {
+        _syncTimeout = syncTimeout;
+    }
+
+    public void waitUntil(Predicate predicate) throws InterruptedException, TimeoutException
+    {
+        waitUntil(predicate, _syncTimeout);
+    }
+
+    public void waitUntil(Predicate predicate, long timeout) throws InterruptedException, TimeoutException
+    {
+        long endTime = System.currentTimeMillis() + timeout;
+
+        synchronized(getLock())
+        {
+            while(!predicate.isSatisfied())
+            {
+                getLock().wait(timeout);
+
+                if(!predicate.isSatisfied())
+                {
+                    timeout = endTime - System.currentTimeMillis();
+                    if(timeout <= 0l)
+                    {
+                        throw new TimeoutException();
+                    }
+                }
+            }
+        }
+
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Tue Jan 14 14:46:35 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 public abstract class LinkEndpoint<T extends LinkEventListener>
 {
@@ -324,6 +325,23 @@ public abstract class LinkEndpoint<T ext
         return _session.getLock();
     }
 
+
+    public long getSyncTimeout()
+    {
+        return _session.getSyncTimeout();
+    }
+
+    public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
+    {
+        _session.waitUntil(predicate);
+    }
+
+    public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
+    {
+        _session.waitUntil(predicate, timeout);
+    }
+
+
     public void attach()
     {
         synchronized(getLock())

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Tue Jan 14 14:46:35 2014
@@ -35,6 +35,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeoutException;
 
 public class SessionEndpoint
 {
@@ -579,19 +580,7 @@ public class SessionEndpoint
             if(payload != null && payloadSent < payload.remaining())
             {
                 payload = payload.duplicate();
-try
-{
                 payload.position(payload.position()+payloadSent);
-}
-catch(IllegalArgumentException e)
-{
-    System.err.println("UNEXPECTED");
-    System.err.println("Payload Position: " + payload.position());
-    System.err.println("Payload Sent: " + payloadSent);
-    System.err.println("Payload Remaining: " + payload.remaining());
-    throw e;
-
-}
 
                 Transfer secondTransfer = new Transfer();
 
@@ -618,6 +607,23 @@ catch(IllegalArgumentException e)
         return _connection.getLock();
     }
 
+
+    public long getSyncTimeout()
+    {
+        return _connection.getSyncTimeout();
+    }
+
+    public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
+    {
+        _connection.waitUntil(predicate);
+    }
+
+    public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
+    {
+        _connection.waitUntil(predicate, timeout);
+    }
+
+
     public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
                                                              String targetAddr,
                                                              String sourceAddr,

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java Tue Jan 14 14:46:35 2014
@@ -31,8 +31,7 @@ import java.util.Map;
 import org.apache.qpid.amqp_1_0.type.*;
 
 public class Error
-  {
-
+{
 
     private ErrorCondition _condition;
 
@@ -40,6 +39,16 @@ public class Error
 
     private Map _info;
 
+    public Error()
+    {
+    }
+
+    public Error(final ErrorCondition condition, final String description)
+    {
+        _condition = condition;
+        _description = description;
+    }
+
     public ErrorCondition getCondition()
     {
         return _condition;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Jan 14 14:46:35 2014
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.message.EnqueableMessage;
@@ -84,6 +85,8 @@ public class BDBMessageStore implements 
     private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, MESSAGE_META_DATA_DB_NAME,
             MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME };
 
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
+
     private EnvironmentFacade _environmentFacade;
     private final AtomicLong _messageId = new AtomicLong(0);
 
@@ -282,16 +285,19 @@ public class BDBMessageStore implements 
     @Override
     public void close() throws AMQStoreException
     {
-        _stateManager.attainState(State.CLOSING);
-        try
-        {
-            closeEnvironment();
-        }
-        catch(DatabaseException e)
+        if (_closed.compareAndSet(false, true))
         {
-            throw new AMQStoreException("Exception occured on message store close", e);
+	    _stateManager.attainState(State.CLOSING);
+	    try
+	    {
+		closeEnvironment();
+	    }
+	    catch(DatabaseException e)
+	    {
+		throw new AMQStoreException("Exception occured on message store close", e);
+	    }
+	    _stateManager.attainState(State.CLOSED);
         }
-        _stateManager.attainState(State.CLOSED);
     }
 
     private void closeEnvironment()

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html Tue Jan 14 14:46:35 2014
@@ -1,3 +1,19 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
 <table class="tableContainer-table tableContainer-table-horiz">
     <tr>
         <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html Tue Jan 14 14:46:35 2014
@@ -1,3 +1,19 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
 <table class="tableContainer-table tableContainer-table-horiz">
   <tr>
       <td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*:  </strong></td>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java Tue Jan 14 14:46:35 2014
@@ -215,12 +215,10 @@ public class HAClusterManagementTest ext
         catch(RuntimeException rte)
         {
             //check cause was BDBs EnvironmentFailureException
-            boolean isExpectedException = rte.getMessage().contains(EnvironmentFailureException.class.getName());
-            if (!isExpectedException)
-            {
-                rte.printStackTrace();
-            }
-            assertTrue("Unexpected exception message:" + rte.getMessage(), isExpectedException);
+            assertTrue("Message '"+rte.getMessage()+"' does not contain '"
+                       + EnvironmentFailureException.class.getName()
+                       + "'.",
+                       rte.getMessage().contains(EnvironmentFailureException.class.getName()));
             // PASS
         }
     }

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core:r1549895-1558036

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Tue Jan 14 14:46:35 2014
@@ -29,8 +29,6 @@ import java.util.Set;
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.ConfigurationEntryStore;
 import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
 import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -125,6 +123,7 @@ public class Broker
         }
         catch(Exception e)
         {
+            LOGGER.fatal("Exception during startup", e);
             try
             {
                 _applicationRegistry.close();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Jan 14 14:46:35 2014
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Tue Jan 14 14:46:35 2014
@@ -31,7 +31,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Tue Jan 14 14:46:35 2014
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.plugin.QpidServiceLoader;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Tue Jan 14 14:46:35 2014
@@ -26,11 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.ExchangeType;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.plugin.ExchangeType;
 
 import java.util.Collection;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 
 import java.util.Collection;
 import java.util.UUID;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Jan 14 14:46:35 2014
@@ -22,17 +22,12 @@ package org.apache.qpid.server.exchange;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;
@@ -41,7 +36,6 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.BaseQueue;
 
 import java.util.ArrayList;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class FanoutExchange extends AbstractExchange
 {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Tue Jan 14 14:46:35 2014
@@ -30,7 +30,6 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.InboundMessage;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Jan 14 14:46:35 2014
@@ -23,8 +23,6 @@ package org.apache.qpid.server.exchange;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.framing.AMQTypedValue;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.AMQMessageHeader;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jan 14 14:46:35 2014
@@ -22,10 +22,7 @@ package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue Jan 14 14:46:35 2014
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Jan 14 14:46:35 2014
@@ -28,7 +28,6 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.framing.FieldTable;
 
 import java.util.Map;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Tue Jan 14 14:46:35 2014
@@ -79,6 +79,7 @@ public interface Connection extends Conf
     public static final String REMOTE_PROCESS_PID = "remoteProcessPid";
     public static final String SESSION_COUNT_LIMIT = "sessionCountLimit";
     public static final String TRANSPORT = "transport";
+    /** Name of port associated with the connection */
     public static final String PORT = "port";
 
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java Tue Jan 14 14:46:35 2014
@@ -25,7 +25,10 @@ import java.util.EnumSet;
 public enum Transport
 {
     TCP,
-    SSL;
+    SSL,
+    WS,
+    WSS,
+    SCTP;
 
     public static Transport valueOfObject(Object transportObject)
     {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java Tue Jan 14 14:46:35 2014
@@ -19,12 +19,8 @@
  */
 package org.apache.qpid.server.model.adapter;
 
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.net.InetSocketAddress;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -42,21 +38,21 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.BrokerMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.KeyStore;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
 import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.TransportProvider;
 import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
 
 public class AmqpPortAdapter extends PortAdapter
 {
     private final Broker _broker;
-    private IncomingNetworkTransport _transport;
+    private AcceptingTransport _transport;
 
     public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor)
     {
@@ -70,42 +66,36 @@ public class AmqpPortAdapter extends Por
         Collection<Transport> transports = getTransports();
         Set<AmqpProtocolVersion> supported = convertFromModelProtocolsToAmqp(getProtocols());
 
-        SSLContext sslContext = null;
-        if (transports.contains(Transport.SSL))
+        TransportProvider transportProvider = null;
+        final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
+        for(TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class))
         {
-            sslContext = createSslContext();
+            if(tpf.getSupportedTransports().contains(transports))
+            {
+                transportProvider = tpf.getTransportProvider(transportSet);
+            }
         }
 
-        AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
-
-        String bindingAddress = (String) getAttribute(Port.BINDING_ADDRESS);
-        if (WILDCARD_ADDRESS.equals(bindingAddress))
+        if(transportProvider == null)
         {
-            bindingAddress = null;
+            throw new IllegalConfigurationException("No transport providers found which can satisfy the requirement to support the transports: " + transports);
         }
-        Integer port = (Integer) getAttribute(Port.PORT);
-        InetSocketAddress bindingSocketAddress = null;
-        if ( bindingAddress == null )
-        {
-            bindingSocketAddress = new InetSocketAddress(port);
-        }
-        else
+
+        SSLContext sslContext = null;
+        if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS))
         {
-            bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
+            sslContext = createSslContext();
         }
 
-        final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(
-                bindingSocketAddress, (Boolean)getAttribute(TCP_NO_DELAY),
-                (Integer)getAttribute(SEND_BUFFER_SIZE), (Integer)getAttribute(RECEIVE_BUFFER_SIZE),
-                (Boolean)getAttribute(NEED_CLIENT_AUTH), (Boolean)getAttribute(WANT_CLIENT_AUTH));
-
-        _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
-        final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(
-                _broker, transports.contains(Transport.TCP) ? sslContext : null,
-                settings.wantClientAuth(), settings.needClientAuth(),
-                supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
+        AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
 
-        _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext);
+        _transport = transportProvider.createTransport(transportSet,
+                                                       sslContext,
+                                                       this,
+                                                       supported,
+                                                       defaultSupportedProtocolReply);
+
+        _transport.start();
         for(Transport transport : getTransports())
         {
             CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
@@ -210,68 +200,4 @@ public class AmqpPortAdapter extends Por
         }
         return null;
     }
-
-    class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
-    {
-        private final InetSocketAddress _bindingSocketAddress;
-        private final Boolean _tcpNoDelay;
-        private final Integer _sendBufferSize;
-        private final Integer _receiveBufferSize;
-        private final boolean _needClientAuth;
-        private final boolean _wantClientAuth;
-
-        public ServerNetworkTransportConfiguration(
-                InetSocketAddress bindingSocketAddress, boolean tcpNoDelay,
-                int sendBufferSize, int receiveBufferSize,
-                boolean needClientAuth, boolean wantClientAuth)
-        {
-            _bindingSocketAddress = bindingSocketAddress;
-            _tcpNoDelay = tcpNoDelay;
-            _sendBufferSize = sendBufferSize;
-            _receiveBufferSize = receiveBufferSize;
-            _needClientAuth = needClientAuth;
-            _wantClientAuth = wantClientAuth;
-        }
-
-        @Override
-        public boolean wantClientAuth()
-        {
-            return _wantClientAuth;
-        }
-
-        @Override
-        public boolean needClientAuth()
-        {
-            return _needClientAuth;
-        }
-
-        @Override
-        public Boolean getTcpNoDelay()
-        {
-            return _tcpNoDelay;
-        }
-
-        @Override
-        public Integer getSendBufferSize()
-        {
-            return _sendBufferSize;
-        }
-
-        @Override
-        public Integer getReceiveBufferSize()
-        {
-            return _receiveBufferSize;
-        }
-
-        @Override
-        public InetSocketAddress getAddress()
-        {
-            return _bindingSocketAddress;
-        }
-    };
-
-    public String toString()
-    {
-        return getName();
-    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Tue Jan 14 14:46:35 2014
@@ -35,6 +35,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Statistics;
@@ -226,7 +227,8 @@ final class ConnectionAdapter extends Ab
         }
         else if(name.equals(PORT))
         {
-            return String.valueOf(_connection.getPort());
+            Port port = _connection.getPort();
+            return String.valueOf(port == null ? null : port.getName());
         }
         return super.getAttribute(name);
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java Tue Jan 14 14:46:35 2014
@@ -556,4 +556,11 @@ public class PortAdapter extends Abstrac
 
         return trustStores;
     }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + " [id=" + getId() + ", name=" + getName() + ", port=" + getPort() + "]";
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java Tue Jan 14 14:46:35 2014
@@ -111,7 +111,7 @@ public class PortFactory
                 throw new IllegalConfigurationException("Can't create port which requests SSL client certificates but has no trust stores configured.");
             }
 
-            if(useClientAuth && !port.getTransports().contains(Transport.SSL))
+            if(useClientAuth && !(port.getTransports().contains(Transport.SSL) || port.getTransports().contains(Transport.WSS)))
             {
                 throw new IllegalConfigurationException("Can't create port which requests SSL client certificates but doesn't use SSL transport.");
             }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java Tue Jan 14 14:46:35 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.plugin;
 import java.util.UUID;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Tue Jan 14 14:46:35 2014
@@ -30,6 +30,8 @@ import java.util.Set;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSocket;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -38,6 +40,7 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.security.SSLStatus;
@@ -143,11 +146,6 @@ public class MultiVersionProtocolEngine 
 
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
-    public void setNetworkConnection(NetworkConnection networkConnection)
-    {
-        setNetworkConnection(networkConnection, networkConnection.getSender());
-    }
-
     public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
     {
         _network = network;
@@ -274,9 +272,9 @@ public class MultiVersionProtocolEngine 
 
         public void received(ByteBuffer msg)
         {
-
             _lastReadTime = System.currentTimeMillis();
-            ByteBuffer msgheader = msg.duplicate();
+            ByteBuffer msgheader = msg.duplicate().slice();
+
             if(_header.remaining() > msgheader.limit())
             {
                 msg.position(msg.limit());
@@ -329,6 +327,7 @@ public class MultiVersionProtocolEngine 
                     }
                 }
 
+
                 if(newDelegate == null && looksLikeSSL(headerBytes))
                 {
                     if(_sslContext !=  null)
@@ -475,7 +474,7 @@ public class MultiVersionProtocolEngine 
             SSLStatus sslStatus = new SSLStatus();
             _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
             _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
-            _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender));
+            _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
         }
 
         @Override
@@ -592,6 +591,9 @@ public class MultiVersionProtocolEngine 
         private final NetworkConnection _network;
         private final SSLBufferingSender _sslSender;
         private final SSLEngine _engine;
+        private Principal _principal;
+        private boolean _principalChecked;
+        private final Object _lock = new Object();
 
         public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
                                     SSLBufferingSender sslSender)
@@ -647,21 +649,25 @@ public class MultiVersionProtocolEngine 
         }
 
         @Override
-        public void setPeerPrincipal(Principal principal)
-        {
-            _network.setPeerPrincipal(principal);
-        }
-
-        @Override
         public Principal getPeerPrincipal()
         {
-            try
-            {
-                return _engine.getSession().getPeerPrincipal();
-            }
-            catch (SSLPeerUnverifiedException e)
+            synchronized (_lock)
             {
-                return null;
+                if(!_principalChecked)
+                {
+                    try
+                    {
+                        _principal =  _engine.getSession().getPeerPrincipal();
+                    }
+                    catch (SSLPeerUnverifiedException e)
+                    {
+                        _principal = null;
+                    }
+
+                    _principalChecked = true;
+                }
+
+                return _principal;
             }
         }
 

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1549895-1558036

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +34,6 @@ import org.apache.qpid.server.virtualhos
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Tue Jan 14 14:46:35 2014
@@ -22,7 +22,6 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InboundMessage;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jan 14 14:46:35 2014
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -734,7 +733,7 @@ public class SimpleAMQQueue implements A
                     && mightAssign(sub, entry)
                     && !sub.wouldSuspend(entry))
                 {
-                    if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+                    if (sub.acquires() && !assign(sub, entry))
                     {
                         // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                         // to acquire the entry for this subscription
@@ -755,10 +754,18 @@ public class SimpleAMQQueue implements A
 
     private boolean assign(final Subscription sub, final QueueEntry entry)
     {
-        return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+        if(_messageGroupManager == null)
+        {
+            //no grouping, try to acquire immediately.
+            return entry.acquire(sub);
+        }
+        else
+        {
+            //the group manager is responsible for acquiring the message if/when appropriate
+            return _messageGroupManager.acceptMessage(sub, entry);
+        }
     }
 
-
     private boolean mightAssign(final Subscription sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null || !sub.acquires())
@@ -1646,7 +1653,7 @@ public class SimpleAMQQueue implements A
                 {
                     if (!sub.wouldSuspend(node))
                     {
-                        if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
+                        if (sub.acquires() && !assign(sub, node))
                         {
                             // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                             // to acquire the entry for this subscription

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java Tue Jan 14 14:46:35 2014
@@ -37,6 +37,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManager;
 
 /**
  * Creates a {@link Subject} formed by the {@link Principal}'s returned from:
@@ -129,6 +130,17 @@ public class SubjectCreator
         }
     }
 
+    public Subject createSubjectWithGroups(Principal principal)
+    {
+        Subject authenticationSubject = new Subject();
+
+        authenticationSubject.getPrincipals().add(principal);
+        authenticationSubject.getPrincipals().addAll(getGroupPrincipals(principal.getName()));
+        authenticationSubject.setReadOnly();
+
+        return authenticationSubject;
+    }
+
     public Subject createSubjectWithGroups(String username)
     {
         Subject authenticationSubject = new Subject();
@@ -159,4 +171,9 @@ public class SubjectCreator
     {
         return _authenticationManager instanceof AnonymousAuthenticationManager;
     }
+
+    public boolean isExternalAuthenticationAllowed()
+    {
+        return _authenticationManager instanceof ExternalAuthenticationManager;
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Jan 14 14:46:35 2014
@@ -79,7 +79,7 @@ abstract public class AbstractJDBCMessag
     private static final int DB_VERSION = 7;
 
     private final AtomicLong _messageId = new AtomicLong(0);
-    private AtomicBoolean _closed = new AtomicBoolean(false);
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
 
     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
     private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
@@ -683,12 +683,14 @@ abstract public class AbstractJDBCMessag
     @Override
     public void close() throws AMQStoreException
     {
-        _closed.getAndSet(true);
-        _stateManager.attainState(State.CLOSING);
+        if (_closed.compareAndSet(false, true))
+        {
+            _stateManager.attainState(State.CLOSING);
 
-        doClose();
+            doClose();
 
-        _stateManager.attainState(State.CLOSED);
+            _stateManager.attainState(State.CLOSED);
+        }
     }
 
 

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1549895-1558036

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Tue Jan 14 14:46:35 2014
@@ -20,9 +20,6 @@
 */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.framing.FieldTable;
-
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Tue Jan 14 14:46:35 2014
@@ -29,14 +29,11 @@ import java.util.Map;
 
 import java.util.Set;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
 
 public class DurableConfigurationStoreHelper
 {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java Tue Jan 14 14:46:35 2014
@@ -63,6 +63,18 @@ public class AssignedSubscriptionMessage
 
     public boolean acceptMessage(Subscription sub, QueueEntry entry)
     {
+        if(assignMessage(sub, entry))
+        {
+            return entry.acquire(sub);
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private boolean assignMessage(Subscription sub, QueueEntry entry)
+    {
         Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
         if(groupVal == null)
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Tue Jan 14 14:46:35 2014
@@ -136,9 +136,21 @@ public class DefinedGroupMessageGroupMan
 
     public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
     {
+        if(assignMessage(sub, entry))
+        {
+            return entry.acquire(sub);
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+    {
         Object groupId = getKey(entry);
         Group group = _groupMap.get(groupId);
-        
+
         if(group == null || !group.isValid())
         {
             group = new Group(groupId, sub);
@@ -152,11 +164,10 @@ public class DefinedGroupMessageGroupMan
             {
                 return false;
             }
-
         }
-        
+
         Subscription assignedSub = group.getSubscription();
-        
+
         if(assignedSub == sub)
         {
             entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
@@ -167,8 +178,7 @@ public class DefinedGroupMessageGroupMan
             return false;            
         }
     }
-    
-    
+
     public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
     {
         EntryFinder visitor = new EntryFinder(sub);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org