You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/12 16:41:48 UTC
svn commit: r1666224 [3/7] - in /qpid/trunk: ./ qpid/ qpid/cpp/src/
qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/management/ qpid/cpp/src/tests/
qpid/java/
qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
qpid/java/bdbstore/src/...
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.por
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java Thu Mar 12 15:41:46 2015
@@ -20,19 +20,23 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.Map;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
-import java.util.Map;
-
public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable
{
Class<? super X> getCategoryClass();
X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+ ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
ConfiguredObjectRecord record,
ConfiguredObject<?>... parents);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java Thu Mar 12 15:41:46 2015
@@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/*
*
*/
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Thu Mar 12 15:41:46 2015
@@ -40,7 +40,7 @@ public interface AMQConnectionModel<T ex
* @param cause
* @param message
*/
- public void close(AMQConstant cause, String message);
+ public void closeAsync(AMQConstant cause, String message);
public void block();
@@ -53,7 +53,7 @@ public interface AMQConnectionModel<T ex
* @param cause
* @param message
*/
- public void closeSession(S session, AMQConstant cause, String message);
+ public void closeSessionAsync(S session, AMQConstant cause, String message);
public long getConnectionId();
@@ -107,4 +107,8 @@ public interface AMQConnectionModel<T ex
void removeSessionListener(SessionModelListener listener);
+ void notifyWork();
+
+ boolean isMessageAssignmentSuspended();
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Mar 12 15:41:46 2015
@@ -113,4 +113,8 @@ public interface AMQSessionModel<T exten
* @return the time of the last activity or 0 if not in a transaction
*/
long getTransactionUpdateTime();
+
+ void transportStateChanged();
+
+ void processPending();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Thu Mar 12 15:41:46 2015
@@ -24,40 +24,31 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.security.Principal;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
- private final Transport _transport;
+ private Transport _transport;
private final ProtocolEngineCreator[] _creators;
private final Runnable _onCloseTask;
@@ -65,15 +56,13 @@ public class MultiVersionProtocolEngine
private String _fqdn;
private final Broker<?> _broker;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
public MultiVersionProtocolEngine(final Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supported,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -92,15 +81,23 @@ public class MultiVersionProtocolEngine
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
- _sslContext = sslContext;
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
_creators = creators;
_onCloseTask = onCloseTask;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ _delegate.setMessageAssignmentSuspended(value);
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _delegate.isMessageAssignmentSuspended();
+ }
public SocketAddress getRemoteAddress()
{
@@ -147,6 +144,12 @@ public class MultiVersionProtocolEngine
_delegate.readerIdle();
}
+ @Override
+ public void encryptedTransport()
+ {
+ _delegate.encryptedTransport();
+ }
+
public void received(ByteBuffer msg)
{
@@ -169,9 +172,21 @@ public class MultiVersionProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _delegate.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _delegate.setTransportBlockedForWriting(blocked);
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
SocketAddress address = _network.getLocalAddress();
@@ -198,10 +213,82 @@ public class MultiVersionProtocolEngine
return _delegate.getLastWriteTime();
}
+ @Override
+ public void processPending()
+ {
+ _delegate.processPending();
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _delegate.hasWork();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _delegate.notifyWork();
+ }
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ _delegate.setWorkListener(listener);
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _delegate.clearWork();
+ }
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPending()
+ {
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return false;
+ }
+
+ @Override
+ public void notifyWork()
+ {
+
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -247,7 +334,13 @@ public class MultiVersionProtocolEngine
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ @Override
+ public void encryptedTransport()
+ {
+
+ }
+
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
@@ -274,12 +367,24 @@ public class MultiVersionProtocolEngine
{
return new Subject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- private long _lastReadTime;
+ private long _lastReadTime = System.currentTimeMillis();
+ private final AtomicBoolean _hasWork = new AtomicBoolean();
public SocketAddress getRemoteAddress()
{
@@ -301,6 +406,47 @@ public class MultiVersionProtocolEngine
return 0;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPending()
+ {
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _hasWork.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _hasWork.set(true);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _hasWork.set(false);
+ }
+
public void received(ByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
@@ -360,15 +506,6 @@ public class MultiVersionProtocolEngine
}
}
-
- if(newDelegate == null && looksLikeSSL(headerBytes))
- {
- if(_sslContext != null)
- {
- newDelegate = new SslDelegateProtocolEngine();
- }
- }
-
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -398,8 +535,13 @@ public class MultiVersionProtocolEngine
}
else
{
+ boolean hasWork = _delegate.hasWork();
+ if (hasWork)
+ {
+ newDelegate.notifyWork();
+ }
_delegate = newDelegate;
-
+ _delegate.setWorkListener(_workListener.get());
_header.flip();
_delegate.received(_header);
if(msg.hasRemaining())
@@ -423,6 +565,17 @@ public class MultiVersionProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -466,132 +619,18 @@ public class MultiVersionProtocolEngine
_network.close();
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
-
- }
-
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
@Override
- public long getLastWriteTime()
+ public void encryptedTransport()
{
- return 0;
- }
- }
-
- private class SslDelegateProtocolEngine implements ServerProtocolEngine
- {
- private final MultiVersionProtocolEngine _decryptEngine;
- private final SSLEngine _engine;
- private final SSLReceiver _sslReceiver;
- private final SSLBufferingSender _sslSender;
- private long _lastReadTime;
-
- private SslDelegateProtocolEngine()
- {
-
- _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
- _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
- null);
-
- _engine = _sslContext.createSSLEngine();
- _engine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_engine);
- SSLUtil.updateEnabledCipherSuites(_engine, _port.getEnabledCipherSuites(), _port.getDisabledCipherSuites());
-
- if(_needClientAuth)
- {
- _engine.setNeedClientAuth(true);
- }
- else if(_wantClientAuth)
+ if(_transport == Transport.TCP)
{
- _engine.setWantClientAuth(true);
+ _transport = Transport.SSL;
}
-
- SSLStatus sslStatus = new SSLStatus();
- _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
- _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
- _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
- }
-
- @Override
- public void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- _sslReceiver.received(msg);
- _sslSender.send();
- _sslSender.flush();
- }
-
- @Override
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- //TODO - Implement
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _decryptEngine.getRemoteAddress();
}
- @Override
- public SocketAddress getLocalAddress()
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
- return _decryptEngine.getLocalAddress();
- }
- @Override
- public long getWrittenBytes()
- {
- return _decryptEngine.getWrittenBytes();
- }
-
- @Override
- public long getReadBytes()
- {
- return _decryptEngine.getReadBytes();
- }
-
- @Override
- public void closed()
- {
- _decryptEngine.closed();
- }
-
- @Override
- public void writerIdle()
- {
- _decryptEngine.writerIdle();
- }
-
- @Override
- public void readerIdle()
- {
- _decryptEngine.readerIdle();
- }
-
- @Override
- public void exception(Throwable t)
- {
- _decryptEngine.exception(t);
- }
-
- @Override
- public long getConnectionId()
- {
- return _decryptEngine.getConnectionId();
- }
-
- @Override
- public Subject getSubject()
- {
- return _decryptEngine.getSubject();
}
@Override
@@ -603,132 +642,9 @@ public class MultiVersionProtocolEngine
@Override
public long getLastWriteTime()
{
- return _decryptEngine.getLastWriteTime();
+ return 0;
}
}
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
-
- private static class SSLNetworkConnection implements NetworkConnection
- {
- private final NetworkConnection _network;
- private final SSLBufferingSender _sslSender;
- private final SSLEngine _engine;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
- SSLBufferingSender sslSender)
- {
- _engine = engine;
- _network = network;
- _sslSender = sslSender;
-
- }
-
- @Override
- public Sender<ByteBuffer> getSender()
- {
- return _sslSender;
- }
-
- @Override
- public void start()
- {
- _network.start();
- }
-
- @Override
- public void close()
- {
- _sslSender.close();
-
- _network.close();
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- @Override
- public void setMaxWriteIdle(int sec)
- {
- _network.setMaxWriteIdle(sec);
- }
- @Override
- public void setMaxReadIdle(int sec)
- {
- _network.setMaxReadIdle(sec);
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
- try
- {
- _principal = _engine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
- {
- _principal = null;
- }
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _network.getMaxReadIdle();
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _network.getMaxWriteIdle();
- }
- }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Thu Mar 12 15:41:46 2015
@@ -27,10 +27,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
@@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineF
private final Broker<?> _broker;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedReply;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
private final Transport _transport;
private final ProtocolEngineCreator[] _creators;
@@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineF
_connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
public MultiVersionProtocolEngineFactory(Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supportedVersions,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineF
}
_broker = broker;
- _sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
@@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineF
}
Collections.sort(creators, new ProtocolEngineCreatorComparator());
_creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
}
- public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
+ public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
{
if(_port.canAcceptNewConnection(remoteSocketAddress))
{
_port.incrementConnectionCount();
- return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ return new MultiVersionProtocolEngine(_broker,
_supported, _defaultSupportedReply, _port, _transport,
ID_GENERATOR.getAndIncrement(),
_creators, _connectionCountDecrementingTask);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Mar 12 15:41:46 2015
@@ -43,11 +43,15 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -96,6 +100,7 @@ import org.apache.qpid.server.util.MapVa
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -642,16 +647,51 @@ public abstract class AbstractQueue<X ex
@Override
- public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- FilterManager filters,
+ public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<ConsumerImpl.Option> optionSet)
+ final EnumSet<ConsumerImpl.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
+ try
+ {
+ return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>()
+ {
+
+ @Override
+ public QueueConsumerImpl execute()
+ throws Exception
+ {
+
+ return addConsumerInternal(target, filters, messageClass, consumerName, optionSet);
+ }
+ });
+ }
+ catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+ ExistingConsumerPreventsExclusive | RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ // Should never happen
+ throw new ServerScopedRuntimeException(e);
+ }
+
+ }
+
+ private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+ FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<ConsumerImpl.Option> optionSet)
+ throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+ ExistingConsumerPreventsExclusive
+ {
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
@@ -763,7 +803,7 @@ public abstract class AbstractQueue<X ex
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
- filters,
+ filters,
messageClass,
optionSet);
@@ -812,19 +852,18 @@ public abstract class AbstractQueue<X ex
deliverAsync();
return consumer;
-
}
@Override
- protected void beforeClose()
+ protected ListenableFuture<Void> beforeClose()
{
_closing = true;
- super.beforeClose();
+ return super.beforeClose();
}
- synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+ void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
{
@@ -835,7 +874,7 @@ public abstract class AbstractQueue<X ex
if (removed)
{
- consumer.close();
+ consumer.closeAsync();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
@@ -1219,10 +1258,6 @@ public abstract class AbstractQueue<X ex
else
{
deliverMessage(sub, entry, false);
- if(sub.acquires())
- {
- entry.unlockAcquisition();
- }
}
}
}
@@ -1798,7 +1833,15 @@ public abstract class AbstractQueue<X ex
for (BindingImpl b : bindingCopy)
{
- b.delete();
+ // TODO - RG - Need to sort out bindings!
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ b.deleteAsync();
+ }
+ else
+ {
+ b.delete();
+ }
}
QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
@@ -1851,7 +1894,7 @@ public abstract class AbstractQueue<X ex
}
_deleteTaskList.clear();
- close();
+ closeAsync();
deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2050,10 +2093,6 @@ public abstract class AbstractQueue<X ex
else
{
deliverMessage(sub, node, batch);
- if(sub.acquires())
- {
- node.unlockAcquisition();
- }
}
}
@@ -2221,7 +2260,8 @@ public abstract class AbstractQueue<X ex
if (consumerDone)
{
sub.flushBatched();
- if (lastLoop && getNextAvailableEntry(sub) == null)
+ boolean noMore = getNextAvailableEntry(sub) == null;
+ if (lastLoop && noMore)
{
sub.queueEmpty();
}
@@ -2595,7 +2635,7 @@ public abstract class AbstractQueue<X ex
{
if (_virtualHost.getState() != State.ACTIVE)
{
- throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ throw new VirtualHostUnavailableException(this._virtualHost);
}
if(!message.isReferenced(this))
@@ -2660,7 +2700,7 @@ public abstract class AbstractQueue<X ex
return allowed;
}
- private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+ private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy == null)
@@ -2862,24 +2902,27 @@ public abstract class AbstractQueue<X ex
//=============
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
- private void doDeleteBeforeInitialize()
+ private ListenableFuture<Void> doDeleteBeforeInitialize()
{
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_virtualHost.removeQueue(this);
preSetAlternateExchange();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Mar 12 15:41:46 2015
@@ -52,5 +52,4 @@ public interface QueueConsumer<X extends
QueueContext getQueueContext();
- ConsumerTarget getTarget();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Mar 12 15:41:46 2015
@@ -198,7 +198,7 @@ class QueueConsumerImpl
if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
{
- close();
+ closeAsync();
}
final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
if(stateListener != null)
@@ -323,6 +323,7 @@ class QueueConsumerImpl
public final void flush()
{
_queue.flushConsumer(this);
+ _target.processPending();
}
public boolean resend(final QueueEntry entry)
@@ -514,6 +515,7 @@ class QueueConsumerImpl
return _selector;
}
+
@Override
public String toLogString()
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.util.Map;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Port;
@@ -49,6 +51,14 @@ public class QueueFactory<X extends Queu
}
@Override
+ public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents);
+ }
+
+ @Override
public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory,
final ConfiguredObjectRecord record,
final ConfiguredObject<?>... parents)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -37,6 +37,9 @@ import java.util.Set;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -96,7 +99,7 @@ public class FileKeyStoreImpl extends Ab
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -111,12 +114,14 @@ public class FileKeyStoreImpl extends Ab
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -37,6 +37,9 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -96,7 +99,7 @@ public class FileTrustStoreImpl extends
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -137,12 +140,14 @@ public class FileTrustStoreImpl extends
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -56,6 +56,8 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.xml.bind.DatatypeConverter;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -181,7 +183,7 @@ public class NonJavaKeyStoreImpl extends
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -199,12 +201,14 @@ public class NonJavaKeyStoreImpl extends
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java Thu Mar 12 15:41:46 2015
@@ -44,6 +44,8 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.security.auth.x500.X500Principal;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -169,7 +171,7 @@ public class NonJavaTrustStoreImpl
}
@StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
// verify that it is not in use
String storeName = getName();
@@ -212,12 +214,14 @@ public class NonJavaTrustStoreImpl
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE)
- protected void doActivate()
+ protected ListenableFuture<Void> doActivate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java Thu Mar 12 15:41:46 2015
@@ -32,6 +32,7 @@ import java.security.AccessController;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java Thu Mar 12 15:41:46 2015
@@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -151,13 +154,14 @@ public abstract class AbstractAuthentica
}
@StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED )
- protected void startQuiesced()
+ protected ListenableFuture<Void> startQuiesced()
{
setState(State.QUIESCED);
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE )
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
try
{
@@ -175,11 +179,11 @@ public abstract class AbstractAuthentica
throw e;
}
}
-
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
String providerName = getName();
@@ -195,15 +199,50 @@ public abstract class AbstractAuthentica
}
}
- close();
- if (_preferencesProvider != null)
- {
- _preferencesProvider.delete();
- }
- deleted();
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- setState(State.DELETED);
+ final ListenableFuture<Void> future = closeAsync();
+ future.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (_preferencesProvider != null)
+ {
+ _preferencesProvider.deleteAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ deleted();
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ }
+ else
+ {
+ try
+ {
+ deleted();
+
+ setState(State.DELETED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java Thu Mar 12 15:41:46 2015
@@ -92,22 +92,15 @@ public abstract class ConfigModelPasswor
@Override
public void deleteUser(final String user) throws AccountNotFoundException
{
- runTask(new VoidTaskWithException<AccountNotFoundException>()
+ final ManagedUser authUser = getUser(user);
+ if(authUser != null)
{
- @Override
- public void execute() throws AccountNotFoundException
- {
- final ManagedUser authUser = getUser(user);
- if(authUser != null)
- {
- authUser.delete();
- }
- else
- {
- throw new AccountNotFoundException("No such user: '" + user + "'");
- }
- }
- });
+ authUser.delete();
+ }
+ else
+ {
+ throw new AccountNotFoundException("No such user: '" + user + "'");
+ }
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java Thu Mar 12 15:41:46 2015
@@ -27,6 +27,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -85,10 +88,11 @@ class ManagedUser extends AbstractConfig
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
_authenticationManager.getUserMap().remove(getName());
deleted();
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Thu Mar 12 15:41:46 2015
@@ -40,6 +40,9 @@ import javax.security.auth.login.Account
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseA
super.onOpen();
_principalDatabase = createDatabase();
initialise();
- List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
- for (Principal user : users)
- {
- PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
- principalAdapter.registerWithParents();
- principalAdapter.open();
- _userMap.put(user, principalAdapter);
- }
}
+
protected abstract PrincipalDatabase createDatabase();
@@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseA
return _principalDatabase;
}
+ @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
+ public ListenableFuture<Void> activate()
+ {
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
+ _userMap.clear();
+ if(!users.isEmpty())
+ {
+ for (final Principal user : users)
+ {
+ final PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
+ principalAdapter.registerWithParents();
+ principalAdapter.openAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _userMap.put(user, principalAdapter);
+ if (_userMap.size() == users.size())
+ {
+ setState(State.ACTIVE);
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+
+ }
+
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
+ }
+ }
- @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
- public void doDelete()
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED)
+ public ListenableFuture<Void> doDelete()
{
File file = new File(_path);
if (file.exists() && file.isFile())
@@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseA
}
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@Override
@@ -479,13 +511,14 @@ public abstract class PrincipalDatabaseA
}
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
try
{
@@ -503,7 +536,7 @@ public abstract class PrincipalDatabaseA
{
LOGGER.warn("Failed to delete user " + _user, e);
}
-
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,9 @@ package org.apache.qpid.server.security.
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Group;
@@ -77,16 +80,18 @@ public class GroupImpl extends AbstractC
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,9 @@ package org.apache.qpid.server.security.
import java.security.Principal;
import java.util.Map;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Group;
import org.apache.qpid.server.model.GroupMember;
@@ -61,15 +64,17 @@ public class GroupMemberImpl extends Abs
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -89,16 +92,18 @@ public class GroupProviderImpl extends A
}
@StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
deleted();
+ return Futures.immediateFuture(null);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Thu Mar 12 15:41:46 2015
@@ -45,6 +45,7 @@ import org.apache.qpid.server.plugin.Mes
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
public abstract class AbstractJDBCMessageStore implements MessageStore
{
@@ -834,10 +835,10 @@ public abstract class AbstractJDBCMessag
}
}
- private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
private void abortTran(ConnectionWrapper connWrapper) throws StoreException
@@ -1231,14 +1232,14 @@ public abstract class AbstractJDBCMessag
}
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
checkMessageStoreOpen();
doPreCommitActions();
- StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+ FutureResult futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
private void doPreCommitActions()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Mar 12 15:41:46 2015
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.util.FutureResult;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore implements MessageStore
@@ -58,9 +59,9 @@ public class MemoryMessageStore implemen
private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
@Override
- public StoreFuture commitTranAsync()
+ public FutureResult commitTranAsync()
{
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java Thu Mar 12 15:41:46 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.util.FutureResult;
public interface Transaction
{
@@ -53,7 +54,7 @@ public interface Transaction
* Commits all operations performed within a given transactional context.
*
*/
- StoreFuture commitTranAsync();
+ FutureResult commitTranAsync();
/**
* Abandons all operations performed within a given transactional context.
@@ -72,4 +73,4 @@ public interface Transaction
void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues,
Transaction.Record[] dequeues);
-}
\ No newline at end of file
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.net.InetSocketAddress;
+import java.util.EnumSet;
import java.util.Collection;
import java.util.Set;
@@ -34,11 +35,11 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
class TCPandSSLTransport implements AcceptingTransport
{
- private IncomingNetworkTransport _networkTransport;
+ private NonBlockingNetworkTransport _networkTransport;
private Set<Transport> _transports;
private SSLContext _sslContext;
private InetSocketAddress _bindingSocketAddress;
@@ -62,7 +63,7 @@ class TCPandSSLTransport implements Acce
@Override
public void start()
{
- String bindingAddress = ((AmqpPort<?>)_port).getBindingAddress();
+ String bindingAddress = _port.getBindingAddress();
if (WILDCARD_ADDRESS.equals(bindingAddress))
{
bindingAddress = null;
@@ -78,17 +79,25 @@ class TCPandSSLTransport implements Acce
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ _networkTransport = new NonBlockingNetworkTransport();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
- settings.wantClientAuth(), settings.needClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
_transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
- _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+ EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+ if(_transports.contains(Transport.TCP))
+ {
+ encryptionSet.add(TransportEncryption.NONE);
+ }
+ if(_transports.contains(Transport.SSL))
+ {
+ encryptionSet.add(TransportEncryption.TLS);
+ }
+ _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
}
public int getAcceptingPort()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Thu Mar 12 15:41:46 2015
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction
public static interface FutureRecorder
{
- public void recordFuture(StoreFuture future, Action action);
+ public void recordFuture(FutureResult future, Action action);
}
@@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction
*/
public void addPostTransactionAction(final Action immediateAction)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
+ addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction);
}
@@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction
}
- private void addFuture(final StoreFuture future, final Action action)
+ private void addFuture(final FutureResult future, final Action action)
{
if(action != null)
{
@@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction
}
}
- private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent)
{
if(action != null)
{
@@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if(txn != null)
{
future = txn.commitTranAsync();
@@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if (txn != null)
{
future = txn.commitTranAsync();
@@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction
{
if(immediatePostTransactionAction != null)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
+ addFuture(FutureResult.IMMEDIATE_FUTURE, new Action()
{
public void postCommit()
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.txn;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -53,7 +54,7 @@ public class LocalTransaction implements
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
- private StoreFuture _asyncTran;
+ private FutureResult _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -271,16 +272,16 @@ public class LocalTransaction implements
}
}
- public StoreFuture commitAsync(final Runnable deferred)
+ public FutureResult commitAsync(final Runnable deferred)
{
sync();
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ FutureResult future = FutureResult.IMMEDIATE_FUTURE;
if(_transaction != null)
{
- future = new StoreFuture()
+ future = new FutureResult()
{
private volatile boolean _completed = false;
- private StoreFuture _underlying = _transaction.commitTranAsync();
+ private FutureResult _underlying = _transaction.commitTranAsync();
@Override
public boolean isComplete()
@@ -297,6 +298,17 @@ public class LocalTransaction implements
checkUnderlyingCompletion();
}
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ if(!_completed)
+ {
+ _underlying.waitForCompletion(timeout);
+ checkUnderlyingCompletion();
+ }
+ }
private synchronized boolean checkUnderlyingCompletion()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org