You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/12 16:41:48 UTC

svn commit: r1666224 [3/7] - in /qpid/trunk: ./ qpid/ qpid/cpp/src/ qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/management/ qpid/cpp/src/tests/ qpid/java/ qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ qpid/java/bdbstore/src/...

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.por
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<
     }
 
     @Override
+    public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+                                           final Map<String, Object> attributes,
+                                           final ConfiguredObject<?>... parents)
+    {
+        return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+    }
+
+    @Override
     public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
                                                  final ConfiguredObjectRecord record,
                                                  final ConfiguredObject<?>... parents)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java Thu Mar 12 15:41:46 2015
@@ -20,19 +20,23 @@
  */
 package org.apache.qpid.server.plugin;
 
+import java.util.Map;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.UnresolvedConfiguredObject;
 
-import java.util.Map;
-
 public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable
 {
     Class<? super X> getCategoryClass();
 
     X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
 
+    ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
     UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
                                           ConfiguredObjectRecord record,
                                           ConfiguredObject<?>... parents);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java Thu Mar 12 15:41:46 2015
@@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/*
  *
  */
 
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Thu Mar 12 15:41:46 2015
@@ -40,7 +40,7 @@ public interface AMQConnectionModel<T ex
      * @param cause
      * @param message
      */
-    public void close(AMQConstant cause, String message);
+    public void closeAsync(AMQConstant cause, String message);
 
     public void block();
 
@@ -53,7 +53,7 @@ public interface AMQConnectionModel<T ex
      * @param cause
      * @param message
      */
-    public void closeSession(S session, AMQConstant cause, String message);
+    public void closeSessionAsync(S session, AMQConstant cause, String message);
 
     public long getConnectionId();
 
@@ -107,4 +107,8 @@ public interface AMQConnectionModel<T ex
 
     void removeSessionListener(SessionModelListener listener);
 
+    void notifyWork();
+
+    boolean isMessageAssignmentSuspended();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Mar 12 15:41:46 2015
@@ -113,4 +113,8 @@ public interface AMQSessionModel<T exten
      * @return the time of the last activity or 0 if not in a transaction
      */
     long getTransactionUpdateTime();
+
+    void transportStateChanged();
+
+    void processPending();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Thu Mar 12 15:41:46 2015
@@ -24,40 +24,31 @@ package org.apache.qpid.server.protocol;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.security.Principal;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.security.auth.Subject;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 public class MultiVersionProtocolEngine implements ServerProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
 
     private final long _id;
-    private final SSLContext _sslContext;
-    private final boolean _wantClientAuth;
-    private final boolean _needClientAuth;
     private final AmqpPort<?> _port;
-    private final Transport _transport;
+    private Transport _transport;
     private final ProtocolEngineCreator[] _creators;
     private final Runnable _onCloseTask;
 
@@ -65,15 +56,13 @@ public class MultiVersionProtocolEngine
     private String _fqdn;
     private final Broker<?> _broker;
     private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
+    private ByteBufferSender _sender;
     private final Protocol _defaultSupportedReply;
 
     private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+    private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
 
     public MultiVersionProtocolEngine(final Broker<?> broker,
-                                      SSLContext sslContext,
-                                      boolean wantClientAuth,
-                                      boolean needClientAuth,
                                       final Set<Protocol> supported,
                                       final Protocol defaultSupportedReply,
                                       AmqpPort<?> port,
@@ -92,15 +81,23 @@ public class MultiVersionProtocolEngine
         _broker = broker;
         _supported = supported;
         _defaultSupportedReply = defaultSupportedReply;
-        _sslContext = sslContext;
-        _wantClientAuth = wantClientAuth;
-        _needClientAuth = needClientAuth;
         _port = port;
         _transport = transport;
         _creators = creators;
         _onCloseTask = onCloseTask;
     }
 
+    @Override
+    public void setMessageAssignmentSuspended(final boolean value)
+    {
+        _delegate.setMessageAssignmentSuspended(value);
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _delegate.isMessageAssignmentSuspended();
+    }
 
     public SocketAddress getRemoteAddress()
     {
@@ -147,6 +144,12 @@ public class MultiVersionProtocolEngine
         _delegate.readerIdle();
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+        _delegate.encryptedTransport();
+    }
+
 
     public void received(ByteBuffer msg)
     {
@@ -169,9 +172,21 @@ public class MultiVersionProtocolEngine
         return _delegate.getSubject();
     }
 
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _delegate.isTransportBlockedForWriting();
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _delegate.setTransportBlockedForWriting(blocked);
+    }
+
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
     {
         _network = network;
         SocketAddress address = _network.getLocalAddress();
@@ -198,10 +213,82 @@ public class MultiVersionProtocolEngine
         return _delegate.getLastWriteTime();
     }
 
+    @Override
+    public void processPending()
+    {
+        _delegate.processPending();
+    }
+
+    @Override
+    public boolean hasWork()
+    {
+        return _delegate.hasWork();
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _delegate.notifyWork();
+    }
 
+    @Override
+    public void setWorkListener(final Action<ServerProtocolEngine> listener)
+    {
+        _workListener.set(listener);
+        _delegate.setWorkListener(listener);
+    }
+
+    @Override
+    public void clearWork()
+    {
+        _delegate.clearWork();
+    }
 
     private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
     {
+
+        @Override
+        public void setMessageAssignmentSuspended(final boolean value)
+        {
+
+        }
+
+        @Override
+        public boolean isMessageAssignmentSuspended()
+        {
+            return false;
+        }
+
+        @Override
+        public void processPending()
+        {
+
+        }
+
+        @Override
+        public boolean hasWork()
+        {
+            return false;
+        }
+
+        @Override
+        public void notifyWork()
+        {
+
+        }
+
+        @Override
+        public void setWorkListener(final Action<ServerProtocolEngine> listener)
+        {
+
+        }
+
+        @Override
+        public void clearWork()
+        {
+
+        }
+
         public SocketAddress getRemoteAddress()
         {
             return _network.getRemoteAddress();
@@ -247,7 +334,13 @@ public class MultiVersionProtocolEngine
 
         }
 
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        @Override
+        public void encryptedTransport()
+        {
+
+        }
+
+        public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
         {
 
         }
@@ -274,12 +367,24 @@ public class MultiVersionProtocolEngine
         {
             return new Subject();
         }
+
+        @Override
+        public boolean isTransportBlockedForWriting()
+        {
+            return false;
+        }
+
+        @Override
+        public void setTransportBlockedForWriting(final boolean blocked)
+        {
+        }
     }
 
     private class SelfDelegateProtocolEngine implements ServerProtocolEngine
     {
         private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
-        private long _lastReadTime;
+        private long _lastReadTime = System.currentTimeMillis();
+        private final AtomicBoolean _hasWork = new AtomicBoolean();
 
         public SocketAddress getRemoteAddress()
         {
@@ -301,6 +406,47 @@ public class MultiVersionProtocolEngine
             return 0;
         }
 
+        @Override
+        public void setMessageAssignmentSuspended(final boolean value)
+        {
+        }
+
+        @Override
+        public boolean isMessageAssignmentSuspended()
+        {
+            return false;
+        }
+
+        @Override
+        public void processPending()
+        {
+
+        }
+
+        @Override
+        public boolean hasWork()
+        {
+            return _hasWork.get();
+        }
+
+        @Override
+        public void notifyWork()
+        {
+            _hasWork.set(true);
+        }
+
+        @Override
+        public void setWorkListener(final Action<ServerProtocolEngine> listener)
+        {
+
+        }
+
+        @Override
+        public void clearWork()
+        {
+            _hasWork.set(false);
+        }
+
         public void received(ByteBuffer msg)
         {
             _lastReadTime = System.currentTimeMillis();
@@ -360,15 +506,6 @@ public class MultiVersionProtocolEngine
                     }
                 }
 
-
-                if(newDelegate == null && looksLikeSSL(headerBytes))
-                {
-                    if(_sslContext !=  null)
-                    {
-                        newDelegate = new SslDelegateProtocolEngine();
-                    }
-                }
-
                 // If no delegate is found then send back a supported protocol version id
                 if(newDelegate == null)
                 {
@@ -398,8 +535,13 @@ public class MultiVersionProtocolEngine
                 }
                 else
                 {
+                    boolean hasWork = _delegate.hasWork();
+                    if (hasWork)
+                    {
+                        newDelegate.notifyWork();
+                    }
                     _delegate = newDelegate;
-
+                    _delegate.setWorkListener(_workListener.get());
                     _header.flip();
                     _delegate.received(_header);
                     if(msg.hasRemaining())
@@ -423,6 +565,17 @@ public class MultiVersionProtocolEngine
             return _delegate.getSubject();
         }
 
+        @Override
+        public boolean isTransportBlockedForWriting()
+        {
+            return false;
+        }
+
+        @Override
+        public void setTransportBlockedForWriting(final boolean blocked)
+        {
+        }
+
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);
@@ -466,132 +619,18 @@ public class MultiVersionProtocolEngine
             _network.close();
         }
 
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-        {
-
-        }
-
-        @Override
-        public long getLastReadTime()
-        {
-            return _lastReadTime;
-        }
-
         @Override
-        public long getLastWriteTime()
+        public void encryptedTransport()
         {
-            return 0;
-        }
-    }
-
-    private class SslDelegateProtocolEngine implements ServerProtocolEngine
-    {
-        private final MultiVersionProtocolEngine _decryptEngine;
-        private final SSLEngine _engine;
-        private final SSLReceiver _sslReceiver;
-        private final SSLBufferingSender _sslSender;
-        private long _lastReadTime;
-
-        private SslDelegateProtocolEngine()
-        {
-
-            _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
-                                                            _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
-                                                            null);
-
-            _engine = _sslContext.createSSLEngine();
-            _engine.setUseClientMode(false);
-            SSLUtil.removeSSLv3Support(_engine);
-            SSLUtil.updateEnabledCipherSuites(_engine, _port.getEnabledCipherSuites(), _port.getDisabledCipherSuites());
-
-            if(_needClientAuth)
-            {
-                _engine.setNeedClientAuth(true);
-            }
-            else if(_wantClientAuth)
+            if(_transport == Transport.TCP)
             {
-                _engine.setWantClientAuth(true);
+                _transport = Transport.SSL;
             }
-
-            SSLStatus sslStatus = new SSLStatus();
-            _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
-            _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
-            _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
-        }
-
-        @Override
-        public void received(ByteBuffer msg)
-        {
-            _lastReadTime = System.currentTimeMillis();
-            _sslReceiver.received(msg);
-            _sslSender.send();
-            _sslSender.flush();
-        }
-
-        @Override
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-        {
-            //TODO - Implement
-        }
-
-        @Override
-        public SocketAddress getRemoteAddress()
-        {
-            return _decryptEngine.getRemoteAddress();
         }
 
-        @Override
-        public SocketAddress getLocalAddress()
+        public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
         {
-            return _decryptEngine.getLocalAddress();
-        }
 
-        @Override
-        public long getWrittenBytes()
-        {
-            return _decryptEngine.getWrittenBytes();
-        }
-
-        @Override
-        public long getReadBytes()
-        {
-            return _decryptEngine.getReadBytes();
-        }
-
-        @Override
-        public void closed()
-        {
-            _decryptEngine.closed();
-        }
-
-        @Override
-        public void writerIdle()
-        {
-            _decryptEngine.writerIdle();
-        }
-
-        @Override
-        public void readerIdle()
-        {
-            _decryptEngine.readerIdle();
-        }
-
-        @Override
-        public void exception(Throwable t)
-        {
-            _decryptEngine.exception(t);
-        }
-
-        @Override
-        public long getConnectionId()
-        {
-            return _decryptEngine.getConnectionId();
-        }
-
-        @Override
-        public Subject getSubject()
-        {
-            return _decryptEngine.getSubject();
         }
 
         @Override
@@ -603,132 +642,9 @@ public class MultiVersionProtocolEngine
         @Override
         public long getLastWriteTime()
         {
-            return _decryptEngine.getLastWriteTime();
+            return 0;
         }
     }
 
-    private boolean looksLikeSSL(byte[] headerBytes)
-    {
-        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
-    }
-
-    private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
-    {
-        return headerBytes[0] == 22 && // SSL Handshake
-               (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
-                (headerBytes[2] == 0 || // SSL 3.0
-                 headerBytes[2] == 1 || // TLS 1.0
-                 headerBytes[2] == 2 || // TLS 1.1
-                 headerBytes[2] == 3)) && // TLS1.2
-               (headerBytes[5] == 1); // client_hello
-    }
-
-    private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
-    {
-        return headerBytes[0] == -128 &&
-               headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
-                (headerBytes[4] == 0 || // SSL 3.0
-                 headerBytes[4] == 1 || // TLS 1.0
-                 headerBytes[4] == 2 || // TLS 1.1
-                 headerBytes[4] == 3);
-    }
-
-
-    private static class SSLNetworkConnection implements NetworkConnection
-    {
-        private final NetworkConnection _network;
-        private final SSLBufferingSender _sslSender;
-        private final SSLEngine _engine;
-        private Principal _principal;
-        private boolean _principalChecked;
-        private final Object _lock = new Object();
-
-        public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
-                                    SSLBufferingSender sslSender)
-        {
-            _engine = engine;
-            _network = network;
-            _sslSender = sslSender;
-
-        }
-
-        @Override
-        public Sender<ByteBuffer> getSender()
-        {
-            return _sslSender;
-        }
-
-        @Override
-        public void start()
-        {
-            _network.start();
-        }
-
-        @Override
-        public void close()
-        {
-            _sslSender.close();
-
-            _network.close();
-        }
-
-        @Override
-        public SocketAddress getRemoteAddress()
-        {
-            return _network.getRemoteAddress();
-        }
-
-        @Override
-        public SocketAddress getLocalAddress()
-        {
-            return _network.getLocalAddress();
-        }
-
-        @Override
-        public void setMaxWriteIdle(int sec)
-        {
-            _network.setMaxWriteIdle(sec);
-        }
 
-        @Override
-        public void setMaxReadIdle(int sec)
-        {
-            _network.setMaxReadIdle(sec);
-        }
-
-        @Override
-        public Principal getPeerPrincipal()
-        {
-            synchronized (_lock)
-            {
-                if(!_principalChecked)
-                {
-                    try
-                    {
-                        _principal =  _engine.getSession().getPeerPrincipal();
-                    }
-                    catch (SSLPeerUnverifiedException e)
-                    {
-                        _principal = null;
-                    }
-
-                    _principalChecked = true;
-                }
-
-                return _principal;
-            }
-        }
-
-        @Override
-        public int getMaxReadIdle()
-        {
-            return _network.getMaxReadIdle();
-        }
-
-        @Override
-        public int getMaxWriteIdle()
-        {
-            return _network.getMaxWriteIdle();
-        }
-    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Thu Mar 12 15:41:46 2015
@@ -27,10 +27,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.net.ssl.SSLContext;
-
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.PortMessages;
 import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
@@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineF
     private final Broker<?> _broker;
     private final Set<Protocol> _supported;
     private final Protocol _defaultSupportedReply;
-    private final SSLContext _sslContext;
-    private final boolean _wantClientAuth;
-    private final boolean _needClientAuth;
     private final AmqpPort<?> _port;
     private final Transport _transport;
     private final ProtocolEngineCreator[] _creators;
@@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineF
             _connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
 
     public MultiVersionProtocolEngineFactory(Broker<?> broker,
-                                             SSLContext sslContext,
-                                             boolean wantClientAuth,
-                                             boolean needClientAuth,
                                              final Set<Protocol> supportedVersions,
                                              final Protocol defaultSupportedReply,
                                              AmqpPort<?> port,
@@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineF
         }
 
         _broker = broker;
-        _sslContext = sslContext;
         _supported = supportedVersions;
         _defaultSupportedReply = defaultSupportedReply;
         final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
@@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineF
         }
         Collections.sort(creators, new ProtocolEngineCreatorComparator());
         _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
-        _wantClientAuth = wantClientAuth;
-        _needClientAuth = needClientAuth;
         _port = port;
         _transport = transport;
     }
 
-    public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
+    public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
     {
         if(_port.canAcceptNewConnection(remoteSocketAddress))
         {
             _port.incrementConnectionCount();
-            return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+            return new MultiVersionProtocolEngine(_broker,
                                                   _supported, _defaultSupportedReply, _port, _transport,
                                                   ID_GENERATOR.getAndIncrement(),
                                                   _creators, _connectionCountDecrementingTask);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Mar 12 15:41:46 2015
@@ -43,11 +43,15 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -96,6 +100,7 @@ import org.apache.qpid.server.util.MapVa
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.TransportException;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -642,16 +647,51 @@ public abstract class AbstractQueue<X ex
 
 
     @Override
-    public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
-                                     FilterManager filters,
+    public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+                                     final FilterManager filters,
                                      final Class<? extends ServerMessage> messageClass,
                                      final String consumerName,
-                                     EnumSet<ConsumerImpl.Option> optionSet)
+                                     final EnumSet<ConsumerImpl.Option> optionSet)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused
     {
 
+        try
+        {
+            return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>()
+            {
+
+                @Override
+                public QueueConsumerImpl execute()
+                        throws Exception
+                {
+
+                    return addConsumerInternal(target, filters, messageClass, consumerName, optionSet);
+                }
+            });
+        }
+        catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+            ExistingConsumerPreventsExclusive | RuntimeException e)
+        {
+            throw e;
+        }
+        catch (Exception e)
+        {
+            // Should never happen
+            throw new ServerScopedRuntimeException(e);
+        }
+
 
+    }
+
+    private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+                                                  FilterManager filters,
+                                                  final Class<? extends ServerMessage> messageClass,
+                                                  final String consumerName,
+                                                  EnumSet<ConsumerImpl.Option> optionSet)
+            throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+                   ExistingConsumerPreventsExclusive
+    {
         if (hasExclusiveConsumer())
         {
             throw new ExistingExclusiveConsumer();
@@ -763,7 +803,7 @@ public abstract class AbstractQueue<X ex
         QueueConsumerImpl consumer = new QueueConsumerImpl(this,
                                                            target,
                                                            consumerName,
-                                                           filters, 
+                                                           filters,
                                                            messageClass,
                                                            optionSet);
 
@@ -812,19 +852,18 @@ public abstract class AbstractQueue<X ex
         deliverAsync();
 
         return consumer;
-
     }
 
     @Override
-    protected void beforeClose()
+    protected ListenableFuture<Void> beforeClose()
     {
         _closing = true;
-        super.beforeClose();
+        return super.beforeClose();
     }
 
 
 
-    synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+    void unregisterConsumer(final QueueConsumerImpl consumer)
     {
         if (consumer == null)
         {
@@ -835,7 +874,7 @@ public abstract class AbstractQueue<X ex
 
         if (removed)
         {
-            consumer.close();
+            consumer.closeAsync();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
 
@@ -1219,10 +1258,6 @@ public abstract class AbstractQueue<X ex
                     else
                     {
                         deliverMessage(sub, entry, false);
-                        if(sub.acquires())
-                        {
-                            entry.unlockAcquisition();
-                        }
                     }
                 }
             }
@@ -1798,7 +1833,15 @@ public abstract class AbstractQueue<X ex
 
             for (BindingImpl b : bindingCopy)
             {
-                b.delete();
+                // TODO - RG - Need to sort out bindings!
+                if(getTaskExecutor().isTaskExecutorThread())
+                {
+                    b.deleteAsync();
+                }
+                else
+                {
+                    b.delete();
+                }
             }
 
             QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
@@ -1851,7 +1894,7 @@ public abstract class AbstractQueue<X ex
             }
 
             _deleteTaskList.clear();
-            close();
+            closeAsync();
             deleted();
             //Log Queue Deletion
             getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2050,10 +2093,6 @@ public abstract class AbstractQueue<X ex
                         else
                         {
                             deliverMessage(sub, node, batch);
-                            if(sub.acquires())
-                            {
-                                node.unlockAcquisition();
-                            }
                         }
 
                     }
@@ -2221,7 +2260,8 @@ public abstract class AbstractQueue<X ex
                             if (consumerDone)
                             {
                                 sub.flushBatched();
-                                if (lastLoop && getNextAvailableEntry(sub) == null)
+                                boolean noMore = getNextAvailableEntry(sub) == null;
+                                if (lastLoop && noMore)
                                 {
                                     sub.queueEmpty();
                                 }
@@ -2595,7 +2635,7 @@ public abstract class AbstractQueue<X ex
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+            throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
         if(!message.isReferenced(this))
@@ -2660,7 +2700,7 @@ public abstract class AbstractQueue<X ex
         return allowed;
     }
 
-    private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+    private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
             throws ExistingConsumerPreventsExclusive
     {
         if(desiredPolicy == null)
@@ -2862,24 +2902,27 @@ public abstract class AbstractQueue<X ex
     //=============
 
     @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
-    private void doDeleteBeforeInitialize()
+    private ListenableFuture<Void> doDeleteBeforeInitialize()
     {
         preSetAlternateExchange();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         _virtualHost.removeQueue(this);
         preSetAlternateExchange();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Mar 12 15:41:46 2015
@@ -52,5 +52,4 @@ public interface QueueConsumer<X extends
 
     QueueContext getQueueContext();
 
-    ConsumerTarget getTarget();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Mar 12 15:41:46 2015
@@ -198,7 +198,7 @@ class QueueConsumerImpl
 
         if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
         {
-            close();
+            closeAsync();
         }
         final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
         if(stateListener != null)
@@ -323,6 +323,7 @@ class QueueConsumerImpl
     public final void flush()
     {
         _queue.flushConsumer(this);
+        _target.processPending();
     }
 
     public boolean resend(final QueueEntry entry)
@@ -514,6 +515,7 @@ class QueueConsumerImpl
         return _selector;
     }
 
+
     @Override
     public String toLogString()
     {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
 
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.Port;
@@ -49,6 +51,14 @@ public class QueueFactory<X extends Queu
     }
 
     @Override
+    public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+                                           final Map<String, Object> attributes,
+                                           final ConfiguredObject<?>... parents)
+    {
+        return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents);
+    }
+
+    @Override
     public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
                                                  final ConfiguredObjectRecord record,
                                                  final ConfiguredObject<?>... parents)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -37,6 +37,9 @@ import java.util.Set;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
@@ -96,7 +99,7 @@ public class FileKeyStoreImpl extends Ab
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -111,12 +114,14 @@ public class FileKeyStoreImpl extends Ab
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -37,6 +37,9 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AuthenticationProvider;
@@ -96,7 +99,7 @@ public class FileTrustStoreImpl extends
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -137,12 +140,14 @@ public class FileTrustStoreImpl extends
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -56,6 +56,8 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.xml.bind.DatatypeConverter;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -181,7 +183,7 @@ public class NonJavaKeyStoreImpl extends
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -199,12 +201,14 @@ public class NonJavaKeyStoreImpl extends
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -44,6 +44,8 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.security.auth.x500.X500Principal;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -169,7 +171,7 @@ public class NonJavaTrustStoreImpl
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -212,12 +214,14 @@ public class NonJavaTrustStoreImpl
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java Thu Mar 12 15:41:46 2015
@@ -32,6 +32,7 @@ import java.security.AccessController;
 import java.security.Principal;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java Thu Mar 12 15:41:46 2015
@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -151,13 +154,14 @@ public abstract class AbstractAuthentica
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED )
-    protected void startQuiesced()
+    protected ListenableFuture<Void> startQuiesced()
     {
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE )
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         try
         {
@@ -175,11 +179,11 @@ public abstract class AbstractAuthentica
                 throw e;
             }
         }
-
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
 
         String providerName = getName();
@@ -195,15 +199,50 @@ public abstract class AbstractAuthentica
             }
         }
 
-        close();
-        if (_preferencesProvider != null)
-        {
-            _preferencesProvider.delete();
-        }
-        deleted();
+        final SettableFuture<Void> returnVal = SettableFuture.create();
 
-        setState(State.DELETED);
+        final ListenableFuture<Void> future = closeAsync();
+        future.addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                if (_preferencesProvider != null)
+                {
+                    _preferencesProvider.deleteAsync().addListener(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                deleted();
+                                setState(State.DELETED);
+                            }
+                            finally
+                            {
+                                returnVal.set(null);
+                            }
+                        }
+                    }, getTaskExecutor().getExecutor());
+                }
+                else
+                {
+                    try
+                    {
+                        deleted();
+
+                        setState(State.DELETED);
+                    }
+                    finally
+                    {
+                        returnVal.set(null);
+                    }
+                }
+            }
+        }, getTaskExecutor().getExecutor());
 
+        return  returnVal;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java Thu Mar 12 15:41:46 2015
@@ -92,22 +92,15 @@ public abstract class ConfigModelPasswor
     @Override
     public void deleteUser(final String user) throws AccountNotFoundException
     {
-        runTask(new VoidTaskWithException<AccountNotFoundException>()
+        final ManagedUser authUser = getUser(user);
+        if(authUser != null)
         {
-            @Override
-            public void execute() throws AccountNotFoundException
-            {
-                final ManagedUser authUser = getUser(user);
-                if(authUser != null)
-                {
-                    authUser.delete();
-                }
-                else
-                {
-                    throw new AccountNotFoundException("No such user: '" + user + "'");
-                }
-            }
-        });
+            authUser.delete();
+        }
+        else
+        {
+            throw new AccountNotFoundException("No such user: '" + user + "'");
+        }
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java Thu Mar 12 15:41:46 2015
@@ -27,6 +27,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.updater.VoidTask;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -85,10 +88,11 @@ class ManagedUser extends AbstractConfig
     }
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         _authenticationManager.getUserMap().remove(getName());
         deleted();
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Thu Mar 12 15:41:46 2015
@@ -40,6 +40,9 @@ import javax.security.auth.login.Account
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseA
         super.onOpen();
         _principalDatabase = createDatabase();
         initialise();
-        List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
-        for (Principal user : users)
-        {
-            PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
-            principalAdapter.registerWithParents();
-            principalAdapter.open();
-            _userMap.put(user, principalAdapter);
-        }
     }
 
+
     protected abstract PrincipalDatabase createDatabase();
 
 
@@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseA
         return _principalDatabase;
     }
 
+    @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
+    public ListenableFuture<Void> activate()
+    {
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
+        _userMap.clear();
+        if(!users.isEmpty())
+        {
+            for (final Principal user : users)
+            {
+                final PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
+                principalAdapter.registerWithParents();
+                principalAdapter.openAsync().addListener(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        _userMap.put(user, principalAdapter);
+                        if (_userMap.size() == users.size())
+                        {
+                            setState(State.ACTIVE);
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor());
+
+            }
+
+            return returnVal;
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
+        }
+    }
 
-    @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
-    public void doDelete()
+    @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED)
+    public ListenableFuture<Void> doDelete()
     {
         File file = new File(_path);
         if (file.exists() && file.isFile())
@@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseA
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -479,13 +511,14 @@ public abstract class PrincipalDatabaseA
         }
 
         @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
-        private void activate()
+        private ListenableFuture<Void> activate()
         {
             setState(State.ACTIVE);
+            return Futures.immediateFuture(null);
         }
 
         @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-        private void doDelete()
+        private ListenableFuture<Void> doDelete()
         {
             try
             {
@@ -503,7 +536,7 @@ public abstract class PrincipalDatabaseA
             {
                 LOGGER.warn("Failed to delete user " + _user, e);
             }
-
+            return Futures.immediateFuture(null);
         }
 
         @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,9 @@ package org.apache.qpid.server.security.
 
 import java.util.Map;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Group;
@@ -77,16 +80,18 @@ public class GroupImpl extends AbstractC
 
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
+        return Futures.immediateFuture(null);
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,9 @@ package org.apache.qpid.server.security.
 import java.security.Principal;
 import java.util.Map;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Group;
 import org.apache.qpid.server.model.GroupMember;
@@ -61,15 +64,17 @@ public class GroupMemberImpl extends Abs
 
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
+        return Futures.immediateFuture(null);
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -89,16 +92,18 @@ public class GroupProviderImpl extends A
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
+        return Futures.immediateFuture(null);
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Thu Mar 12 15:41:46 2015
@@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.Mes
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
 
 public abstract class AbstractJDBCMessageStore implements MessageStore
 {
@@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessag
         }
     }
 
-    private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+    private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
     {
         commitTran(connWrapper);
-        return StoreFuture.IMMEDIATE_FUTURE;
+        return FutureResult.IMMEDIATE_FUTURE;
     }
 
     private void abortTran(ConnectionWrapper connWrapper) throws StoreException
@@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public StoreFuture commitTranAsync()
+        public FutureResult commitTranAsync()
         {
             checkMessageStoreOpen();
             doPreCommitActions();
-            StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+            FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
             storedSizeChange(_storeSizeIncrease);
             doPostCommitActions();
-            return storeFuture;
+            return futureResult;
         }
 
         private void doPreCommitActions()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Mar 12 15:41:46 2015
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
 
 /** A simple message store that stores the messages in a thread-safe structure in memory. */
 public class MemoryMessageStore implements MessageStore
@@ -58,9 +59,9 @@ public class MemoryMessageStore implemen
         private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
 
         @Override
-        public StoreFuture commitTranAsync()
+        public FutureResult commitTranAsync()
         {
-            return StoreFuture.IMMEDIATE_FUTURE;
+            return FutureResult.IMMEDIATE_FUTURE;
         }
 
         @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java Thu Mar 12 15:41:46 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.util.FutureResult;
 
 public interface Transaction
 {
@@ -53,7 +54,7 @@ public interface Transaction
      * Commits all operations performed within a given transactional context.
      *
      */
-    StoreFuture commitTranAsync();
+    FutureResult commitTranAsync();
 
     /**
      * Abandons all operations performed within a given transactional context.
@@ -72,4 +73,4 @@ public interface Transaction
 
     void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues,
                    Transaction.Record[] dequeues);
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
 import java.net.InetSocketAddress;
+import java.util.EnumSet;
 import java.util.Collection;
 import java.util.Set;
 
@@ -34,11 +35,11 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
 
 class TCPandSSLTransport implements AcceptingTransport
 {
-    private IncomingNetworkTransport _networkTransport;
+    private NonBlockingNetworkTransport _networkTransport;
     private Set<Transport> _transports;
     private SSLContext _sslContext;
     private InetSocketAddress _bindingSocketAddress;
@@ -62,7 +63,7 @@ class TCPandSSLTransport implements Acce
     @Override
     public void start()
     {
-        String bindingAddress = ((AmqpPort<?>)_port).getBindingAddress();
+        String bindingAddress = _port.getBindingAddress();
         if (WILDCARD_ADDRESS.equals(bindingAddress))
         {
             bindingAddress = null;
@@ -78,17 +79,25 @@ class TCPandSSLTransport implements Acce
         }
 
         final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
-        _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+        _networkTransport = new NonBlockingNetworkTransport();
         final MultiVersionProtocolEngineFactory protocolEngineFactory =
                 new MultiVersionProtocolEngineFactory(
-                _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
-                settings.wantClientAuth(), settings.needClientAuth(),
+                _port.getParent(Broker.class),
                 _supported,
                 _defaultSupportedProtocolReply,
                 _port,
                 _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
 
-        _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+        EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+        if(_transports.contains(Transport.TCP))
+        {
+            encryptionSet.add(TransportEncryption.NONE);
+        }
+        if(_transports.contains(Transport.SSL))
+        {
+            encryptionSet.add(TransportEncryption.TLS);
+        }
+        _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
     }
 
     public int getAcceptingPort()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Thu Mar 12 15:41:46 2015
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 
@@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction
 
     public static interface FutureRecorder
     {
-        public void recordFuture(StoreFuture future, Action action);
+        public void recordFuture(FutureResult future, Action action);
 
     }
 
@@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction
      */
     public void addPostTransactionAction(final Action immediateAction)
     {
-        addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
+        addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction);
 
     }
 
@@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction
         Transaction txn = null;
         try
         {
-            StoreFuture future;
+            FutureResult future;
             if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction
             }
             else
             {
-                future = StoreFuture.IMMEDIATE_FUTURE;
+                future = FutureResult.IMMEDIATE_FUTURE;
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction
 
     }
 
-    private void addFuture(final StoreFuture future, final Action action)
+    private void addFuture(final FutureResult future, final Action action)
     {
         if(action != null)
         {
@@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction
         }
     }
 
-    private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+    private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent)
     {
         if(action != null)
         {
@@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction
                 }
 
             }
-            StoreFuture future;
+            FutureResult future;
             if(txn != null)
             {
                 future = txn.commitTranAsync();
@@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction
             }
             else
             {
-                future = StoreFuture.IMMEDIATE_FUTURE;
+                future = FutureResult.IMMEDIATE_FUTURE;
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction
         Transaction txn = null;
         try
         {
-            StoreFuture future;
+            FutureResult future;
             if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
@@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction
             }
             else
             {
-                future = StoreFuture.IMMEDIATE_FUTURE;
+                future = FutureResult.IMMEDIATE_FUTURE;
             }
             addEnqueueFuture(future, postTransactionAction, message.isPersistent());
             postTransactionAction = null;
@@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction
                 }
             }
 
-            StoreFuture future;
+            FutureResult future;
             if (txn != null)
             {
                 future = txn.commitTranAsync();
@@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction
             }
             else
             {
-                future = StoreFuture.IMMEDIATE_FUTURE;
+                future = FutureResult.IMMEDIATE_FUTURE;
             }
             addEnqueueFuture(future, postTransactionAction, message.isPersistent());
             postTransactionAction = null;
@@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction
     {
         if(immediatePostTransactionAction != null)
         {
-            addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
+            addFuture(FutureResult.IMMEDIATE_FUTURE, new Action()
             {
                 public void postCommit()
                 {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.txn;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 
@@ -53,7 +54,7 @@ public class LocalTransaction implements
     private final MessageStore _transactionLog;
     private volatile long _txnStartTime = 0L;
     private volatile long _txnUpdateTime = 0l;
-    private StoreFuture _asyncTran;
+    private FutureResult _asyncTran;
 
     public LocalTransaction(MessageStore transactionLog)
     {
@@ -271,16 +272,16 @@ public class LocalTransaction implements
         }
     }
 
-    public StoreFuture commitAsync(final Runnable deferred)
+    public FutureResult commitAsync(final Runnable deferred)
     {
         sync();
-        StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+        FutureResult future = FutureResult.IMMEDIATE_FUTURE;
         if(_transaction != null)
         {
-            future = new StoreFuture()
+            future = new FutureResult()
                         {
                             private volatile boolean _completed = false;
-                            private StoreFuture _underlying = _transaction.commitTranAsync();
+                            private FutureResult _underlying = _transaction.commitTranAsync();
 
                             @Override
                             public boolean isComplete()
@@ -297,6 +298,17 @@ public class LocalTransaction implements
                                     checkUnderlyingCompletion();
                                 }
                             }
+
+                            @Override
+                            public void waitForCompletion(final long timeout) throws TimeoutException
+                            {
+
+                                if(!_completed)
+                                {
+                                    _underlying.waitForCompletion(timeout);
+                                    checkUnderlyingCompletion();
+                                }
+                            }
 
                             private synchronized boolean checkUnderlyingCompletion()
                             {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org