You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [30/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java Thu Feb 28 16:14:30 2013
@@ -22,13 +22,13 @@ package org.apache.qpid.server.security.
import java.net.SocketAddress;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import javax.management.remote.JMXAuthenticator;
-import javax.management.remote.JMXPrincipal;
import javax.security.auth.Subject;
public class RMIPasswordAuthenticator implements JMXAuthenticator
@@ -38,23 +38,33 @@ public class RMIPasswordAuthenticator im
static final String SHOULD_HAVE_2_ELEMENTS = "User details should have 2 elements, username, password";
static final String SHOULD_BE_NON_NULL = "Supplied username and password should be non-null";
static final String INVALID_CREDENTIALS = "Invalid user details supplied";
+ static final String USER_NOT_AUTHORISED_FOR_MANAGEMENT = "User not authorised for management";
static final String CREDENTIALS_REQUIRED = "User details are required. " +
- "Please ensure you are using an up to date management console to connect.";
+ "Please ensure you are using an up to date management console to connect.";
- private AuthenticationManager _authenticationManager = null;
- private SocketAddress _socketAddress;
+ private final Broker _broker;
+ private final SocketAddress _address;
- public RMIPasswordAuthenticator(SocketAddress socketAddress)
+ public RMIPasswordAuthenticator(Broker broker, SocketAddress address)
{
- _socketAddress = socketAddress;
+ _broker = broker;
+ _address = address;
}
- public void setAuthenticationManager(final AuthenticationManager authenticationManager)
+ public Subject authenticate(Object credentials) throws SecurityException
{
- _authenticationManager = authenticationManager;
+ validateCredentials(credentials);
+
+ final String[] userCredentials = (String[]) credentials;
+ final String username = (String) userCredentials[0];
+ final String password = (String) userCredentials[1];
+
+ final Subject authenticatedSubject = doAuthentication(username, password);
+ doManagementAuthorisation(authenticatedSubject);
+ return authenticatedSubject;
}
- public Subject authenticate(Object credentials) throws SecurityException
+ private void validateCredentials(Object credentials)
{
// Verify that credential's are of type String[].
if (!(credentials instanceof String[]))
@@ -70,41 +80,27 @@ public class RMIPasswordAuthenticator im
}
// Verify that required number of credentials.
- final String[] userCredentials = (String[]) credentials;
- if (userCredentials.length != 2)
+ if (((String[])credentials).length != 2)
{
throw new SecurityException(SHOULD_HAVE_2_ELEMENTS);
}
+ }
- final String username = (String) userCredentials[0];
- final String password = (String) userCredentials[1];
-
+ private Subject doAuthentication(final String username, final String password)
+ {
// Verify that all required credentials are actually present.
if (username == null || password == null)
{
throw new SecurityException(SHOULD_BE_NON_NULL);
}
- // Verify that an AuthenticationManager has been set.
- if (_authenticationManager == null)
+ SubjectCreator subjectCreator = _broker.getSubjectCreator(_address);
+ if (subjectCreator == null)
{
- try
- {
- if(ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress) != null)
- {
- _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress);
- }
- else
- {
- throw new SecurityException(UNABLE_TO_LOOKUP);
- }
- }
- catch(IllegalStateException e)
- {
- throw new SecurityException(UNABLE_TO_LOOKUP);
- }
+ throw new SecurityException("Can't get subject creator for " + _address);
}
- final AuthenticationResult result = _authenticationManager.authenticate(username, password);
+
+ final SubjectAuthenticationResult result = subjectCreator.authenticate(username, password);
if (AuthenticationStatus.ERROR.equals(result.getStatus()))
{
@@ -112,10 +108,7 @@ public class RMIPasswordAuthenticator im
}
else if (AuthenticationStatus.SUCCESS.equals(result.getStatus()))
{
- final Subject subject = result.getSubject();
- subject.getPrincipals().add(new JMXPrincipal(username));
- subject.setReadOnly();
- return subject;
+ return result.getSubject();
}
else
{
@@ -123,4 +116,21 @@ public class RMIPasswordAuthenticator im
}
}
+ private void doManagementAuthorisation(Subject authenticatedSubject)
+ {
+ SecurityManager.setThreadSubject(authenticatedSubject);
+ try
+ {
+ if (!_broker.getSecurityManager().accessManagement())
+ {
+ throw new SecurityException(USER_NOT_AUTHORISED_FOR_MANAGEMENT);
+ }
+ }
+ finally
+ {
+ SecurityManager.setThreadSubject(null);
+ }
+ }
+
+
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java Thu Feb 28 16:14:30 2013
@@ -23,6 +23,7 @@ package org.apache.qpid.server.security.
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import javax.security.auth.callback.Callback;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Thu Feb 28 16:14:30 2013
@@ -23,6 +23,8 @@ package org.apache.qpid.server.security.
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
+
public class AnonymousSaslServer implements SaslServer
{
@@ -52,7 +54,7 @@ public class AnonymousSaslServer impleme
public String getAuthorizationID()
{
- return null;
+ return AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL.getName();
}
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java Thu Feb 28 16:14:30 2013
@@ -139,6 +139,12 @@ public class CRAMMD5HexInitialiser exten
{
_realPricipalDatabase.reload();
}
+
+ @Override
+ public void setPasswordFile(String passwordFile) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Thu Feb 28 16:14:30 2013
@@ -31,10 +31,10 @@ import org.apache.qpid.framing.MethodDis
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -47,32 +47,29 @@ public class AMQStateManager implements
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final VirtualHostRegistry _virtualHostRegistry;
+ private final Broker _broker;
private final AMQProtocolSession _protocolSession;
/** The current state */
private AMQState _currentState;
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
+ public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
{
-
- _virtualHostRegistry = virtualHostRegistry;
+ _broker = broker;
_protocolSession = protocolSession;
_currentState = AMQState.CONNECTION_NOT_STARTED;
}
/**
- * Get the ApplicationRegistry associated with this AMQStateManager
- *
- * returns the application registry associated with the VirtualHostRegistry of the AMQStateManager
+ * Get the Broker instance
*
- * @return the ApplicationRegistry
+ * @return the Broker
*/
- public IApplicationRegistry getApplicationRegistry()
+ public Broker getBroker()
{
- return _virtualHostRegistry.getApplicationRegistry();
+ return _broker;
}
public AMQState getCurrentState()
@@ -148,7 +145,7 @@ public class AMQStateManager implements
public VirtualHostRegistry getVirtualHostRegistry()
{
- return _virtualHostRegistry;
+ return _broker.getVirtualHostRegistry();
}
public AMQProtocolSession getProtocolSession()
@@ -157,13 +154,9 @@ public class AMQStateManager implements
return _protocolSession;
}
- /**
- * Get the AuthenticationManager associated with the ProtocolSession of the AMQStateManager
- *
- * @return the AuthenticationManager
- */
- public AuthenticationManager getAuthenticationManager()
+
+ public SubjectCreator getSubjectCreator()
{
- return getApplicationRegistry().getAuthenticationManager(getProtocolSession().getLocalAddress());
+ return _broker.getSubjectCreator(getProtocolSession().getLocalAddress());
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Thu Feb 28 16:14:30 2013
@@ -46,19 +46,7 @@ public interface ConfigurationRecoveryHa
public static interface BindingRecoveryHandler
{
void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
- BrokerLinkRecoveryHandler completeBindingRecovery();
- }
-
- public static interface BrokerLinkRecoveryHandler
- {
- BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
- void completeBrokerLinkRecovery();
- }
-
- public static interface BridgeRecoveryHandler
- {
- void bridge(UUID id, long createTime, Map<String,String> arguments);
- void completeBridgeRecoveryForLink();
+ void completeBindingRecovery();
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Thu Feb 28 16:14:30 2013
@@ -26,8 +26,6 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.queue.AMQQueue;
public interface DurableConfigurationStore
@@ -122,12 +120,5 @@ public interface DurableConfigurationSto
* @throws AMQStoreException If the operation fails for any reason.
*/
void updateQueue(AMQQueue queue) throws AMQStoreException;
-
- void createBrokerLink(BrokerLink link) throws AMQStoreException;
-
- void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
-
- void createBridge(Bridge bridge) throws AMQStoreException;
-
- void deleteBridge(Bridge bridge) throws AMQStoreException;
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Feb 28 16:14:30 2013
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore extends NullMessageStore
{
+ public static final String TYPE = "Memory";
private final AtomicLong _messageId = new AtomicLong(1);
private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -138,6 +139,6 @@ public class MemoryMessageStore extends
@Override
public String getStoreType()
{
- return "Memory";
+ return TYPE;
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Thu Feb 28 16:14:30 2013
@@ -24,8 +24,6 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.queue.AMQQueue;
public abstract class NullMessageStore implements MessageStore
@@ -78,26 +76,6 @@ public abstract class NullMessageStore i
}
@Override
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- }
-
- @Override
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- }
-
- @Override
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java Thu Feb 28 16:14:30 2013
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-
public enum State
{
/** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */
@@ -30,7 +28,7 @@ public enum State
INITIALISING,
/**
* The initial set-up of the store has completed.
- * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+ * If the store is persistent, it has not yet loaded configuration from disk.
*
* From the point of view of the user, the store is essentially stopped.
*/
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Thu Feb 28 16:14:30 2013
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.der
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -41,7 +40,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -55,12 +53,9 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
@@ -236,7 +231,7 @@ public class DerbyMessageStore implement
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
- private static final String DERBY_STORE_TYPE = "DERBY";
+ public static final String TYPE = "DERBY";
private final StateManager _stateManager;
@@ -572,8 +567,7 @@ public class DerbyMessageStore implement
BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
+ brh.completeBindingRecovery();
}
catch (SQLException e)
{
@@ -581,144 +575,6 @@ public class DerbyMessageStore implement
}
}
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- throws SQLException
- {
- _logger.info("Recovering broker links...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
-
- try
- {
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- UUID id = new UUID(rs.getLong(2), rs.getLong(1));
- long createTime = rs.getLong(3);
- Blob argumentsAsBlob = rs.getBlob(4);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
- int size = dis.readInt();
-
- Map<String,String> arguments = new HashMap<String, String>();
-
- for(int i = 0; i < size; i++)
- {
- arguments.put(dis.readUTF(), dis.readUTF());
- }
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
-
- }
- }
- catch (IOException e)
- {
- throw new SQLException(e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- throws SQLException
- {
- _logger.info("Recovering bridges for link " + linkId + "...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
-
- try
- {
- stmt.setLong(1, linkId.getLeastSignificantBits());
- stmt.setLong(2, linkId.getMostSignificantBits());
-
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- UUID id = new UUID(rs.getLong(2), rs.getLong(1));
- long createTime = rs.getLong(3);
- Blob argumentsAsBlob = rs.getBlob(6);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
- int size = dis.readInt();
-
- Map<String,String> arguments = new HashMap<String, String>();
-
- for(int i = 0; i < size; i++)
- {
- arguments.put(dis.readUTF(), dis.readUTF());
- }
-
- brh.bridge(id, createTime, arguments);
-
- }
- brh.completeBridgeRecoveryForLink();
- }
- catch (IOException e)
- {
- throw new SQLException(e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
@Override
public void close() throws Exception
{
@@ -975,71 +831,6 @@ public class DerbyMessageStore implement
}
}
- @Override
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
-
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
- try
- {
-
- stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, link.getQMFId().getMostSignificantBits());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
-
- try
- {
-
- insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
- insertStmt.setLong(3, link.getCreateTime());
-
- byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
- ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
- insertStmt.setBinaryStream(4,bis,argumentBytes.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
- }
- }
- }
-
private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
{
byte[] argumentBytes;
@@ -1072,139 +863,7 @@ public class DerbyMessageStore implement
return argumentBytes;
}
- @Override
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- _logger.debug("public void deleteBrokerLink( " + link + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_LINKS);
- stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, link.getQMFId().getMostSignificantBits());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
-
- }
-
- @Override
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
- try
- {
-
- UUID id = bridge.getQMFId();
- stmt.setLong(1, id.getLeastSignificantBits());
- stmt.setLong(2, id.getMostSignificantBits());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
-
- try
- {
-
- insertStmt.setLong(1, id.getLeastSignificantBits());
- insertStmt.setLong(2, id.getMostSignificantBits());
-
- insertStmt.setLong(3, bridge.getCreateTime());
-
- UUID linkId = bridge.getLink().getQMFId();
- insertStmt.setLong(4, linkId.getLeastSignificantBits());
- insertStmt.setLong(5, linkId.getMostSignificantBits());
-
- byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
- ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
- insertStmt.setBinaryStream(6,bis,argumentBytes.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- @Override
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- _logger.debug("public void deleteBridge( " + bridge + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
- stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
- }
@Override
public Transaction newTransaction()
@@ -2134,8 +1793,9 @@ public class DerbyMessageStore implement
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
buf.position(0);
+ buf.limit(length);
return buf;
}
@@ -2673,7 +2333,7 @@ public class DerbyMessageStore implement
@Override
public String getStoreType()
{
- return DERBY_STORE_TYPE;
+ return TYPE;
}
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu Feb 28 16:14:30 2013
@@ -27,11 +27,6 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfigType;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -61,8 +56,7 @@ import java.util.concurrent.locks.Reentr
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id. <p/>
*/
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener,
- SubscriptionConfig
+public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
{
private StateListener _stateListener = new StateListener()
@@ -91,7 +85,6 @@ public abstract class SubscriptionImpl i
private final long _subscriptionID;
private LogSubject _logSubject;
private LogActor _logActor;
- private UUID _qmfId;
private final AtomicLong _deliveredCount = new AtomicLong(0);
private final AtomicLong _deliveredBytes = new AtomicLong(0);
@@ -373,11 +366,6 @@ public abstract class SubscriptionImpl i
return _channel;
}
- public ConfigStore getConfigStore()
- {
- return getQueue().getConfigStore();
- }
-
public Long getDelivered()
{
return _deliveredCount.get();
@@ -391,9 +379,6 @@ public abstract class SubscriptionImpl i
}
_queue = queue;
- _qmfId = getConfigStore().createId();
- getConfigStore().addConfiguredObject(this);
-
_logSubject = new SubscriptionLogSubject(this);
_logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
@@ -547,8 +532,6 @@ public abstract class SubscriptionImpl i
{
_stateChangeLock.unlock();
}
- getConfigStore().removeConfiguredObject(this);
-
//Log Subscription closed
CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE());
}
@@ -752,11 +735,6 @@ public abstract class SubscriptionImpl i
return "WINDOW";
}
- public SessionConfig getSessionConfig()
- {
- return getChannel();
- }
-
public boolean isBrowsing()
{
return isBrowser();
@@ -767,32 +745,16 @@ public abstract class SubscriptionImpl i
return true;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
public boolean isDurable()
{
return false;
}
- public SubscriptionConfigType getConfigType()
- {
- return SubscriptionConfigType.getInstance();
- }
-
public boolean isExclusive()
{
return getQueue().hasExclusiveSubscriber();
}
- public ConfiguredObject getParent()
- {
- return getSessionConfig();
- }
-
public String getName()
{
return String.valueOf(_consumerTag);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Thu Feb 28 16:14:30 2013
@@ -24,11 +24,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfigType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.CreditCreditManager;
@@ -86,7 +81,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
{
private final long _subscriptionID;
@@ -125,7 +120,6 @@ public class Subscription_0_10 implement
private LogActor _logActor;
private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private UUID _qmfId;
private String _traceExclude;
private String _trace;
private final long _createTime = System.currentTimeMillis();
@@ -192,8 +186,6 @@ public class Subscription_0_10 implement
Map<String, Object> arguments = queue.getArguments();
_traceExclude = (String) arguments.get("qpid.trace.exclude");
_trace = (String) arguments.get("qpid.trace.id");
- _qmfId = getConfigStore().createId();
- getConfigStore().addConfiguredObject(this);
String filterLogString = null;
_logActor = GenericActor.getInstance(this);
@@ -283,7 +275,6 @@ public class Subscription_0_10 implement
}
}
_creditManager.removeListener(this);
- getConfigStore().removeConfiguredObject(this);
CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
}
finally
@@ -295,11 +286,6 @@ public class Subscription_0_10 implement
}
- public ConfigStore getConfigStore()
- {
- return getQueue().getConfigStore();
- }
-
public Long getDelivered()
{
return _deliveredCount.get();
@@ -970,12 +956,6 @@ public class Subscription_0_10 implement
return _session;
}
-
- public SessionConfig getSessionConfig()
- {
- return getSessionModel();
- }
-
public boolean isBrowsing()
{
return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
@@ -986,20 +966,11 @@ public class Subscription_0_10 implement
return getQueue().hasExclusiveSubscriber();
}
- public ConfiguredObject getParent()
- {
- return getSessionConfig();
- }
-
public boolean isDurable()
{
return false;
}
- public SubscriptionConfigType getConfigType()
- {
- return SubscriptionConfigType.getInstance();
- }
public boolean isExplicitAcknowledge()
{
@@ -1011,12 +982,6 @@ public class Subscription_0_10 implement
return _flowMode.toString();
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
public String getName()
{
return _destination;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Thu Feb 28 16:14:30 2013
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.server.transport;
+import java.net.SocketAddress;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -39,6 +38,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -48,6 +48,7 @@ import org.apache.qpid.transport.Executi
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.network.NetworkConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -55,7 +56,6 @@ import static org.apache.qpid.server.log
public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
{
- private ConnectionConfig _config;
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
@@ -69,6 +69,7 @@ public class ServerConnection extends Co
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Principal _peerPrincipal;
+ private NetworkConnection _networkConnection;
public ServerConnection(final long connectionId)
{
@@ -147,16 +148,6 @@ public class ServerConnection extends Co
initialiseStatistics();
}
- public void setConnectionConfig(final ConnectionConfig config)
- {
- _config = config;
- }
-
- public ConnectionConfig getConfig()
- {
- return _config;
- }
-
public void onOpen(final Runnable task)
{
_onOpenTask = task;
@@ -228,7 +219,7 @@ public class ServerConnection extends Co
MessageFormat.format(CONNECTION_FORMAT,
getConnectionId(),
getClientId(),
- getConfig().getAddress(),
+ getRemoteAddressString(),
getVirtualHost().getName())
+ "] ";
}
@@ -238,7 +229,7 @@ public class ServerConnection extends Co
MessageFormat.format(USER_FORMAT,
getConnectionId(),
getClientId(),
- getConfig().getAddress())
+ getRemoteAddressString())
+ "] ";
}
@@ -247,7 +238,7 @@ public class ServerConnection extends Co
return "[" +
MessageFormat.format(SOCKET_FORMAT,
getConnectionId(),
- getConfig().getAddress())
+ getRemoteAddressString())
+ "] ";
}
}
@@ -396,7 +387,7 @@ public class ServerConnection extends Co
else
{
_authorizedSubject = authorizedSubject;
- _authorizedPrincipal = authorizedSubject.getPrincipals().iterator().next();
+ _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
}
}
@@ -417,7 +408,7 @@ public class ServerConnection extends Co
public String getRemoteAddressString()
{
- return getConfig().getAddress();
+ return String.valueOf(getRemoteAddress());
}
public String getUserName()
@@ -489,4 +480,32 @@ public class ServerConnection extends Co
{
_peerPrincipal = peerPrincipal;
}
+
+ @Override
+ public void setRemoteAddress(SocketAddress remoteAddress)
+ {
+ super.setRemoteAddress(remoteAddress);
+ }
+
+ @Override
+ public void setLocalAddress(SocketAddress localAddress)
+ {
+ super.setLocalAddress(localAddress);
+ }
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
+
+ public void doHeartbeat()
+ {
+ super.doHeartBeat();
+
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Thu Feb 28 16:14:30 2013
@@ -32,18 +32,19 @@ import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,36 +54,49 @@ public class ServerConnectionDelegate ex
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
+ private final Broker _broker;
private final String _localFQDN;
- private final IApplicationRegistry _appRegistry;
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
- private final AuthenticationManager _authManager;
+ private final SubjectCreator _subjectCreator;
- public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN, AuthenticationManager authManager)
+ public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
{
- this(createConnectionProperties(appRegistry.getBrokerConfig()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN, authManager);
+ this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
}
private ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
- IApplicationRegistry appRegistry,
+ Broker broker,
String localFQDN,
- AuthenticationManager authManager)
+ SubjectCreator subjectCreator)
{
- super(properties, parseToList(authManager.getMechanisms()), locales);
+ super(properties, parseToList(subjectCreator.getMechanisms()), locales);
- _appRegistry = appRegistry;
+ _broker = broker;
_localFQDN = localFQDN;
- _maxNoOfChannels = appRegistry.getConfiguration().getMaxChannelCount();
- _authManager = authManager;
+ _maxNoOfChannels = (Integer)broker.getAttribute(Broker.SESSION_COUNT_LIMIT);
+ _subjectCreator = subjectCreator;
+ }
+
+ private static List<String> getFeatures(Broker broker)
+ {
+ String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
+ final List<String> features = new ArrayList<String>();
+ if (brokerDisabledFeatures == null || !brokerDisabledFeatures.contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+ {
+ features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ }
+
+ return Collections.unmodifiableList(features);
}
- private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
+ private static Map<String, Object> createConnectionProperties(final Broker broker)
{
final Map<String,Object> map = new HashMap<String,Object>(2);
- map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag());
- final List<String> features = brokerConfig.getFeatures();
+ // Federation tag is used by the client to identify the broker instance
+ map.put(ServerPropertyNames.FEDERATION_TAG, broker.getId().toString());
+ final List<String> features = getFeatures(broker);
if (features != null && features.size() > 0)
{
map.put(ServerPropertyNames.QPID_FEATURES, features);
@@ -112,14 +126,14 @@ public class ServerConnectionDelegate ex
protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException
{
- return _authManager.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
+ return _subjectCreator.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
}
protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
{
final ServerConnection sconn = (ServerConnection) conn;
- final AuthenticationResult authResult = _authManager.authenticate(ss, response);
+ final SubjectAuthenticationResult authResult = _subjectCreator.authenticate(ss, response);
if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
{
@@ -166,7 +180,7 @@ public class ServerConnectionDelegate ex
{
vhostName = "";
}
- vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+ vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName);
SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
@@ -174,7 +188,7 @@ public class ServerConnectionDelegate ex
{
sconn.setVirtualHost(vhost);
- if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress()))
+ if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress()))
{
sconn.setState(Connection.State.CLOSING);
sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
@@ -215,14 +229,18 @@ public class ServerConnectionDelegate ex
return;
}
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
- }
+ if(ok.hasHeartbeat())
+ {
+ final int heartbeat = ok.getHeartbeat();
+ if(heartbeat > 0)
+ {
+ final NetworkConnection networkConnection = sconn.getNetworkConnection();
+ networkConnection.setMaxReadIdle(2 * heartbeat);
+ networkConnection.setMaxWriteIdle(heartbeat);
+ }
+ }
- @Override
- protected int getHeartbeatMax()
- {
- //TODO: implement broker support for actually sending heartbeats
- return 0;
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
@Override
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Feb 28 16:14:30 2013
@@ -42,13 +42,8 @@ import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.TransactionTimeoutHelper;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -91,7 +86,7 @@ import static org.apache.qpid.server.log
import static org.apache.qpid.util.Serial.gt;
public class ServerSession extends Session
- implements AuthorizationHolder, SessionConfig,
+ implements AuthorizationHolder,
AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
@@ -100,8 +95,7 @@ public class ServerSession extends Sessi
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
- private final UUID _id;
- private ConnectionConfig _connectionConfig;
+ private final UUID _id = UUID.randomUUID();
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
@@ -139,7 +133,6 @@ public class ServerSession extends Sessi
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
@@ -147,21 +140,21 @@ public class ServerSession extends Sessi
private final TransactionTimeoutHelper _transactionTimeoutHelper;
- ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
- {
- this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
- }
- public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
+ public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
super(connection, delegate, name, expiry);
- _connectionConfig = connConfig;
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
- _id = getConfigStore().createId();
- getConfigStore().addConfiguredObject(this);
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+ {
+ @Override
+ public void doTimeoutAction(String reason) throws AMQException
+ {
+ getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+ }
+ });
}
protected void setState(State state)
@@ -184,12 +177,6 @@ public class ServerSession extends Sessi
invoke(new MessageStop(""));
}
- private ConfigStore getConfigStore()
- {
- return getConnectionConfig().getConfigStore();
- }
-
-
@Override
protected boolean isFull(int id)
{
@@ -206,9 +193,8 @@ public class ServerSession extends Sessi
}
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction, 0L);
+ _transaction.enqueue(queues,message, postTransactionAction);
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
}
@@ -389,8 +375,6 @@ public class ServerSession extends Sessi
}
_messageDispositionListenerMap.clear();
- getConfigStore().removeConfiguredObject(this);
-
for (Task task : _taskList)
{
task.doTask(this);
@@ -424,7 +408,6 @@ public class ServerSession extends Sessi
entry.release();
}
});
- updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
@@ -470,11 +453,6 @@ public class ServerSession extends Sessi
return _transaction.isTransactional();
}
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
-
public void selectTx()
{
_transaction = new LocalTransaction(this.getMessageStore());
@@ -609,22 +587,6 @@ public class ServerSession extends Sessi
}
}
- /**
- * Update last transaction activity timestamp
- */
- public void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(System.currentTimeMillis());
- }
- }
-
- public Long getTxnStarts()
- {
- return _txnStarts.get();
- }
-
public Long getTxnCommits()
{
return _txnCommits.get();
@@ -682,23 +644,7 @@ public class ServerSession extends Sessi
public VirtualHost getVirtualHost()
{
- return (VirtualHost) _connectionConfig.getVirtualHost();
- }
-
- @Override
- public UUID getQMFId()
- {
- return _id;
- }
-
- public SessionConfigType getConfigType()
- {
- return SessionConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
+ return getConnection().getVirtualHost();
}
public boolean isDurable()
@@ -706,44 +652,16 @@ public class ServerSession extends Sessi
return false;
}
- public boolean isAttached()
- {
- return true;
- }
-
- public long getDetachedLifespan()
- {
- return 0;
- }
-
- public Long getExpiryTime()
- {
- return null;
- }
-
- public Long getMaxClientRate()
- {
- return null;
- }
-
- public ConnectionConfig getConnectionConfig()
- {
- return _connectionConfig;
- }
-
- public String getSessionName()
- {
- return getName().toString();
- }
public long getCreateTime()
{
return _createTime;
}
- public void mgmtClose()
+ @Override
+ public UUID getId()
{
- close();
+ return _id;
}
public AMQConnectionModel getConnectionModel()
@@ -774,28 +692,7 @@ public class ServerSession extends Sessi
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
-
- _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
- TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
- return;
- }
-
- _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
- TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
- return;
- }
- }
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
public void block(AMQQueue queue)
@@ -878,9 +775,7 @@ public class ServerSession extends Sessi
? getConnection().getConnectionId()
: -1;
- String remoteAddress = _connectionConfig instanceof ProtocolEngine
- ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString()
- : "";
+ String remoteAddress = String.valueOf(getConnection().getRemoteAddress());
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
connectionId,
@@ -1065,14 +960,16 @@ public class ServerSession extends Sessi
super.setClose(close);
}
- public int compareTo(AMQSessionModel session)
+ @Override
+ public int getConsumerCount()
{
- return getQMFId().compareTo(session.getQMFId());
+ return _subscriptions.values().size();
}
@Override
- public int getConsumerCount()
+ public int compareTo(AMQSessionModel o)
{
- return _subscriptions.values().size();
+ return getId().compareTo(o.getId());
}
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Feb 28 16:14:30 2013
@@ -31,7 +31,6 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -41,11 +40,11 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
@@ -1266,18 +1265,12 @@ public class ServerSessionDelegate exten
}
}
queueRegistry.registerQueue(queue);
- boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
- if (autoRegister)
- {
-
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
- Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
+ Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
-
- }
+ virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
if (method.hasAutoDelete()
&& method.getAutoDelete()
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Thu Feb 28 16:14:30 2013
@@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction
_futureRecorder = recorder;
}
+ @Override
public long getTransactionStartTime()
{
return 0L;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
@@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Thu Feb 28 16:14:30 2013
@@ -52,11 +52,18 @@ public class AutoCommitTransaction imple
_messageStore = transactionLog;
}
+ @Override
public long getTransactionStartTime()
{
return 0L;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
@@ -178,7 +185,7 @@ public class AutoCommitTransaction imple
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -270,4 +277,6 @@ public class AutoCommitTransaction imple
}
}
+
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Thu Feb 28 16:14:30 2013
@@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -39,10 +38,6 @@ public class DistributedTransaction impl
private final AutoCommitTransaction _autoCommitTransaction;
- private volatile Transaction _transaction;
-
- private long _txnStartTime = 0L;
-
private DtxBranch _branch;
private AMQSessionModel _session;
private VirtualHost _vhost;
@@ -55,9 +50,16 @@ public class DistributedTransaction impl
_autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore());
}
+ @Override
public long getTransactionStartTime()
{
- return _txnStartTime;
+ return 0;
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0;
}
public void addPostTransactionAction(Action postTransactionAction)
@@ -107,7 +109,7 @@ public class DistributedTransaction impl
{
_branch.enqueue(queue, message);
_branch.addPostTransactionAcion(postTransactionAction);
- enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis());
+ enqueue(Collections.singletonList(queue), message, postTransactionAction);
}
else
{
@@ -116,7 +118,7 @@ public class DistributedTransaction impl
}
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message,
- Action postTransactionAction, long currentTime)
+ Action postTransactionAction)
{
if(_branch != null)
{
@@ -128,7 +130,7 @@ public class DistributedTransaction impl
}
else
{
- _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime);
+ _autoCommitTransaction.enqueue(queues, message, postTransactionAction);
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Thu Feb 28 16:14:30 2013
@@ -49,25 +49,42 @@ public class LocalTransaction implements
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile Transaction _transaction;
- private MessageStore _transactionLog;
- private long _txnStartTime = 0L;
+ private final ActivityTimeAccessor _activityTime;
+ private final MessageStore _transactionLog;
+ private volatile long _txnStartTime = 0L;
+ private volatile long _txnUpdateTime = 0l;
private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ this(transactionLog, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return System.currentTimeMillis();
+ }
+ });
}
-
- public boolean inTransaction()
+
+ public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
{
- return _transaction != null;
+ _transactionLog = transactionLog;
+ _activityTime = activityTime;
}
+ @Override
public long getTransactionStartTime()
{
return _txnStartTime;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return _txnUpdateTime;
+ }
+
public void addPostTransactionAction(Action postTransactionAction)
{
sync();
@@ -78,6 +95,7 @@ public class LocalTransaction implements
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -104,6 +122,7 @@ public class LocalTransaction implements
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
try
{
@@ -180,6 +199,7 @@ public class LocalTransaction implements
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -189,7 +209,7 @@ public class LocalTransaction implements
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -202,15 +222,11 @@ public class LocalTransaction implements
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
-
- if (_txnStartTime == 0L)
- {
- _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
- }
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent())
{
@@ -224,8 +240,7 @@ public class LocalTransaction implements
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
}
-
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -378,16 +393,24 @@ public class LocalTransaction implements
}
throw new RuntimeException("Failed to commit transaction", e);
}
-
-
}
private void doPostTransactionActions()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions");
+ }
+
for(int i = 0; i < _postTransactionActions.size(); i++)
{
_postTransactionActions.get(i).postCommit();
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Completed post transaction actions");
+ }
}
public void rollback()
@@ -427,16 +450,34 @@ public class LocalTransaction implements
}
}
+ private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime()
+ {
+ long currentTime = _activityTime.getActivityTime();
+
+ if (_txnStartTime == 0)
+ {
+ _txnStartTime = currentTime;
+ }
+ _txnUpdateTime = currentTime;
+ }
+
private void resetDetails()
{
_asyncTran = null;
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
+ _txnUpdateTime = 0;
}
public boolean isTransactional()
{
return true;
}
+
+ public interface ActivityTimeAccessor
+ {
+ long getActivityTime();
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Thu Feb 28 16:14:30 2013
@@ -55,11 +55,18 @@ public interface ServerTransaction
/**
* Return the time the current transaction started.
- *
+ *
* @return the time this transaction started or 0 if not in a transaction
*/
long getTransactionStartTime();
+ /**
+ * Return the time of the last activity on the current transaction.
+ *
+ * @return the time of the last activity or 0 if not in a transaction
+ */
+ long getTransactionUpdateTime();
+
/**
* Register an Action for execution after transaction commit or rollback. Actions
* will be executed in the order in which they are registered.
@@ -92,7 +99,7 @@ public interface ServerTransaction
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
+ void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
/**
* Commit the transaction represented by this object.
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1375509-1450773
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Feb 28 16:14:30 2013
@@ -25,13 +25,10 @@ import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -41,7 +38,7 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
+public interface VirtualHost extends DurableConfigurationStore.Source, Closeable, StatisticsGatherer
{
IConnectionRegistry getConnectionRegistry();
@@ -61,8 +58,6 @@ public interface VirtualHost extends Dur
void close();
- UUID getBrokerId();
-
UUID getId();
void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
@@ -77,25 +72,12 @@ public interface VirtualHost extends Dur
int getHouseKeepingActiveCount();
- IApplicationRegistry getApplicationRegistry();
+ VirtualHostRegistry getVirtualHostRegistry();
BindingFactory getBindingFactory();
- void createBrokerConnection(String transport,
- String host,
- int port,
- String vhost,
- boolean durable,
- String authMechanism, String username, String password);
-
- public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments);
-
- ConfigStore getConfigStore();
-
DtxRegistry getDtxRegistry();
- void removeBrokerConnection(BrokerLink brokerLink);
-
LinkRegistry getLinkRegistry(String remoteContainerId);
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org