You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [25/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Oct 21 01:19:00 2011
@@ -20,47 +20,26 @@
*/
package org.apache.qpid.server.transport;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import java.util.*;
public class ServerConnectionDelegate extends ServerDelegate
{
private String _localFQDN;
private final IApplicationRegistry _appRegistry;
+
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
@@ -89,42 +68,24 @@ public class ServerConnectionDelegate ex
return list;
}
+ @Override
public ServerSession getSession(Connection conn, SessionAttach atc)
{
- SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+ SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
return ssn;
}
+ @Override
protected SaslServer createSaslServer(String mechanism) throws SaslException
{
return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
}
- protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
- {
- final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
- final ServerConnection sconn = (ServerConnection) conn;
-
-
- if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
- {
- tuneAuthorizedConnection(sconn);
- sconn.setAuthorizedSubject(authResult.getSubject());
- }
- else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
- {
- connectionAuthContinue(sconn, authResult.getChallenge());
- }
- else
- {
- connectionAuthFailed(sconn, authResult.getCause());
- }
- }
-
+ @Override
public void connectionClose(Connection conn, ConnectionClose close)
{
try
@@ -138,9 +99,10 @@ public class ServerConnectionDelegate ex
}
+ @Override
public void connectionOpen(Connection conn, ConnectionOpen open)
{
- final ServerConnection sconn = (ServerConnection) conn;
+ ServerConnection sconn = (ServerConnection) conn;
VirtualHost vhost;
String vhostName;
@@ -154,7 +116,7 @@ public class ServerConnectionDelegate ex
}
vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
- SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
+ SecurityManager.setThreadPrincipal(conn.getAuthorizationID());
if(vhost != null)
{
@@ -176,27 +138,6 @@ public class ServerConnectionDelegate ex
sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
sconn.setState(Connection.State.CLOSING);
}
-
- }
-
- @Override
- public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
- {
- ServerConnection sconn = (ServerConnection) conn;
- int okChannelMax = ok.getChannelMax();
-
- if (okChannelMax > getChannelMax())
- {
- _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
- "client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the servers offered limit (" + getChannelMax() +")");
-
- //Due to the error we must forcefully close the connection without negotiation
- sconn.getSender().close();
- return;
- }
-
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
@Override
@@ -211,59 +152,4 @@ public class ServerConnectionDelegate ex
{
return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
}
-
- @Override public void sessionDetach(Connection conn, SessionDetach dtc)
- {
- // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures
- // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister
- // completes.
- unregisterAllSubscriptions(conn, dtc);
- super.sessionDetach(conn, dtc);
- }
-
- private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc)
- {
- final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subs)
- {
- ssn.unregister(subscription_0_10);
- }
- }
-
- @Override
- public void sessionAttach(final Connection conn, final SessionAttach atc)
- {
- final String clientId = new String(atc.getName());
- final Session ssn = getSession(conn, atc);
-
- if(isSessionNameUnique(clientId,conn))
- {
- conn.registerSession(ssn);
- super.sessionAttach(conn, atc);
- }
- else
- {
- ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
- ssn.closed();
- }
- }
-
- private boolean isSessionNameUnique(final String name, final Connection conn)
- {
- final ServerConnection sconn = (ServerConnection) conn;
- final String userId = sconn.getUserName();
-
- final Iterator<AMQConnectionModel> connections =
- ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
- while(connections.hasNext())
- {
- final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
- if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
- {
- return false;
- }
- }
- return true;
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Fri Oct 21 01:19:00 2011
@@ -23,25 +23,9 @@ package org.apache.qpid.server.transport
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.security.auth.Subject;
+import com.sun.security.auth.UserPrincipal;
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
@@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
{
- private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private final UUID _id;
@@ -116,7 +111,8 @@ public class ServerSession extends Sessi
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
+
+ private Principal _principal;
private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
@@ -129,27 +125,27 @@ public class ServerSession extends Sessi
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
}
- public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
- {
- super(connection, delegate, name, expiry);
- _connectionConfig = connConfig;
- _transaction = new AutoCommitTransaction(this.getMessageStore());
-
- _reference = new WeakReference<Session>(this);
- _id = getConfigStore().createId();
- getConfigStore().addConfiguredObject(this);
- }
-
protected void setState(State state)
{
super.setState(state);
if (state == State.OPEN)
{
- _actor.message(ChannelMessages.CREATE());
+ _actor.message(ChannelMessages.CREATE());
}
}
+ public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
+ {
+ super(connection, delegate, name, expiry);
+ _connectionConfig = connConfig;
+ _transaction = new AutoCommitTransaction(this.getMessageStore());
+ _principal = new UserPrincipal(connection.getAuthorizationID());
+ _reference = new WeakReference(this);
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
@@ -164,8 +160,8 @@ public class ServerSession extends Sessi
public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
- getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
+
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
{
BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -193,14 +189,12 @@ public class ServerSession extends Sessi
});
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
}
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
- getConnectionModel().registerMessageDelivered(xfr.getBodySize());
invoke(xfr, postIdSettingAction);
}
@@ -383,7 +377,6 @@ public class ServerSession extends Sessi
entry.release();
}
});
- updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
@@ -417,7 +410,7 @@ public class ServerSession extends Sessi
catch (AMQException e)
{
// TODO
- _logger.error("Failed to unregister subscription", e);
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
finally
{
@@ -432,11 +425,6 @@ public class ServerSession extends Sessi
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
public void selectTx()
{
@@ -483,17 +471,6 @@ public class ServerSession extends Sessi
}
}
- /**
- * Update last transaction activity timestamp
- */
- public void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(System.currentTimeMillis());
- }
- }
-
public Long getTxnStarts()
{
return _txnStarts.get();
@@ -514,14 +491,9 @@ public class ServerSession extends Sessi
return _txnCount.get();
}
- public Principal getAuthorizedPrincipal()
+ public Principal getPrincipal()
{
- return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
- }
-
- public Subject getAuthorizedSubject()
- {
- return ((ServerConnection) getConnection()).getAuthorizedSubject();
+ return _principal;
}
public void addSessionCloseTask(Task task)
@@ -634,61 +606,18 @@ public class ServerSession extends Sessi
return (LogSubject) this;
}
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
- {
- if (inTransaction())
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
-
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
- }
-
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
- }
- else if (openClose > 0L && openTime > openClose)
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
- }
- }
- }
-
+ @Override
public String toLogString()
{
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
- ((ServerConnection) getConnection()).getConnectionId(),
+ getConnection().getConnectionId(),
getClientID(),
((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
getVirtualHost().getName(),
getChannel())
+ "] ";
- }
- @Override
- public void close()
- {
- // unregister subscriptions in order to prevent sending of new messages
- // to subscriptions with closing session
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
- {
- unregister(subscription_0_10);
- }
-
- super.close();
}
+
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Fri Oct 21 01:19:00 2011
@@ -25,34 +25,31 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.*;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.flow.WindowCreditManager;
-import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Acquired;
@@ -98,34 +95,26 @@ import org.apache.qpid.transport.TxSelec
public class ServerSessionDelegate extends SessionDelegate
{
- private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
+ private final IApplicationRegistry _appRegistry;
- public ServerSessionDelegate()
+ public ServerSessionDelegate(IApplicationRegistry appRegistry)
{
-
+ _appRegistry = appRegistry;
}
@Override
public void command(Session session, Method method)
{
- try
- {
- setThreadSubject(session);
+ SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID());
- if(!session.isClosing())
+ if(!session.isClosing())
+ {
+ super.command(session, method);
+ if (method.isSync())
{
- super.command(session, method);
- if (method.isSync())
- {
- session.flushProcessed();
- }
+ session.flushProcessed();
}
}
- catch(RuntimeException e)
- {
- LOGGER.error("Exception processing command", e);
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e);
- }
}
@Override
@@ -134,6 +123,8 @@ public class ServerSessionDelegate exten
((ServerSession)session).accept(method.getTransfers());
}
+
+
@Override
public void messageReject(Session session, MessageReject method)
{
@@ -168,6 +159,7 @@ public class ServerSessionDelegate exten
@Override
public void messageSubscribe(Session session, MessageSubscribe method)
{
+
//TODO - work around broken Python tests
if(!method.hasAcceptMode())
{
@@ -211,33 +203,32 @@ public class ServerSessionDelegate exten
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
+ else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
+
if(queue.isExclusive())
{
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
- if(queue.getAuthorizationHolder() == null)
+ if(queue.getPrincipalHolder() == null)
{
- queue.setAuthorizationHolder(s);
- queue.setExclusiveOwningSession(s);
+ queue.setPrincipalHolder((ServerSession)session);
((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
{
+
public void doTask(ServerSession session)
{
- if(queue.getAuthorizationHolder() == session)
+ if(queue.getPrincipalHolder() == session)
{
- queue.setAuthorizationHolder(null);
- queue.setExclusiveOwningSession(null);
+ queue.setPrincipalHolder(null);
}
}
});
}
+
}
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -253,7 +244,7 @@ public class ServerSessionDelegate exten
return;
}
- Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
destination,
method.getAcceptMode(),
method.getAcquireMode(),
@@ -284,10 +275,25 @@ public class ServerSessionDelegate exten
}
}
+
@Override
public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ Exchange exchange;
+ if(xfr.hasDestination())
+ {
+ exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ if(exchange == null)
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+ }
+ else
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+
DeliveryProperties delvProps = null;
if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -295,7 +301,7 @@ public class ServerSessionDelegate exten
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
- final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+ MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
@@ -305,63 +311,65 @@ public class ServerSessionDelegate exten
return;
}
-
- final Exchange exchangeInUse;
- ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
- if(queues.isEmpty() && exchange.getAlternateExchange() != null)
- {
- final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(messageMetaData);
- if (!queues.isEmpty())
- {
- exchangeInUse = alternateExchange;
- }
- else
- {
- exchangeInUse = exchange;
- }
- }
- else
+
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+ StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ ByteBuffer body = xfr.getBody();
+ if(body != null)
{
- exchangeInUse = exchange;
+ storeMessage.addContent(0, body);
}
+ storeMessage.flushToStore();
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
+
+ ArrayList<? extends BaseQueue> queues = exchange.route(message);
- if(!queues.isEmpty())
+
+
+ if(queues != null && queues.size() != 0)
{
- final MessageStore store = getVirtualHost(ssn).getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
((ServerSession) ssn).enqueue(message, queues);
}
else
{
- if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
{
- RangeSet rejects = new RangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
- }
- else
- {
- ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = new RangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ Exchange alternate = exchange.getAlternateExchange();
+ if(alternate != null)
+ {
+ queues = alternate.route(message);
+ if(queues != null && queues.size() != 0)
+ {
+ ((ServerSession) ssn).enqueue(message, queues);
+ }
+ else
+ {
+ //TODO - log the message discard
+ }
+ }
+ else
+ {
+ //TODO - log the message discard
+ }
+
+
+ }
}
+
+
}
ssn.processed(xfr);
- }
- private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
- final MessageMetaData_0_10 messageMetaData, final MessageStore store)
- {
- final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
- ByteBuffer body = xfr.getBody();
- if(body != null)
- {
- storeMessage.addContent(0, body);
- }
- storeMessage.flushToStore();
- return storeMessage;
}
@Override
@@ -381,7 +389,7 @@ public class ServerSessionDelegate exten
((ServerSession)session).unregister(sub);
if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
{
- queue.setAuthorizationHolder(null);
+ queue.setPrincipalHolder(null);
}
}
}
@@ -440,19 +448,6 @@ public class ServerSessionDelegate exten
VirtualHost virtualHost = getVirtualHost(session);
Exchange exchange = getExchange(session, exchangeName);
- //we must check for any unsupported arguments present and throw not-implemented
- if(method.hasArguments())
- {
- Map<String,Object> args = method.getArguments();
-
- //QPID-3392: currently we don't support any!
- if(!args.isEmpty())
- {
- exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
- return;
- }
- }
-
if(method.getPassive())
{
if(exchange == null)
@@ -462,6 +457,7 @@ public class ServerSessionDelegate exten
}
else
{
+ // TODO - check exchange has same properties
if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
@@ -566,25 +562,6 @@ public class ServerSessionDelegate exten
}
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
- {
- final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
- Exchange exchange;
- if(xfr.hasDestination())
- {
- exchange = exchangeRegistry.getExchange(xfr.getDestination());
- if(exchange == null)
- {
- exchange = exchangeRegistry.getDefaultExchange();
- }
- }
- else
- {
- exchange = exchangeRegistry.getDefaultExchange();
- }
- return exchange;
- }
-
private VirtualHost getVirtualHost(Session session)
{
ServerConnection conn = getServerConnection(session);
@@ -606,12 +583,6 @@ public class ServerSessionDelegate exten
try
{
- if (nameNullOrEmpty(method.getExchange()))
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Delete not allowed for default exchange");
- return;
- }
-
Exchange exchange = getExchange(session, method.getExchange());
if(exchange == null)
@@ -647,16 +618,6 @@ public class ServerSessionDelegate exten
}
}
- private boolean nameNullOrEmpty(String name)
- {
- if(name == null || name.length() == 0)
- {
- return true;
- }
-
- return false;
- }
-
private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes)
{
for(ExchangeType type : registeredTypes)
@@ -703,9 +664,9 @@ public class ServerSessionDelegate exten
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
}
- else if (nameNullOrEmpty(method.getExchange()))
+ else if (!method.hasExchange())
{
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
}
/*
else if (!method.hasBindingKey())
@@ -774,9 +735,9 @@ public class ServerSessionDelegate exten
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
}
- else if (nameNullOrEmpty(method.getExchange()))
+ else if (!method.hasExchange())
{
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Unbind not allowed for default exchange");
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
}
else if (!method.hasBindingKey())
{
@@ -806,6 +767,9 @@ public class ServerSessionDelegate exten
}
}
}
+
+
+ super.exchangeUnbind(session, method);
}
@Override
@@ -1005,10 +969,10 @@ public class ServerSessionDelegate exten
}
- if (method.hasAutoDelete()
- && method.getAutoDelete()
- && method.hasExclusive()
- && method.getExclusive())
+ if(method.hasAutoDelete()
+ && method.getAutoDelete()
+ && method.hasExclusive()
+ && method.getExclusive())
{
final AMQQueue q = queue;
final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -1035,23 +999,23 @@ public class ServerSessionDelegate exten
}
});
}
- if (method.hasExclusive()
- && method.getExclusive())
+ else if(method.getExclusive())
{
final AMQQueue q = queue;
final ServerSession.Task removeExclusive = new ServerSession.Task()
{
+
public void doTask(ServerSession session)
{
- q.setAuthorizationHolder(null);
+ q.setPrincipalHolder(null);
q.setExclusiveOwningSession(null);
}
};
final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
queue.addQueueDeleteTask(new AMQQueue.Task()
{
+
public void doTask(AMQQueue queue) throws AMQException
{
s.removeSessionCloseTask(removeExclusive);
@@ -1065,7 +1029,7 @@ public class ServerSessionDelegate exten
}
}
}
- else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1113,7 +1077,7 @@ public class ServerSessionDelegate exten
}
else
{
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
+ if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1259,8 +1223,6 @@ public class ServerSessionDelegate exten
@Override
public void closed(Session session)
{
- setThreadSubject(session);
-
for(Subscription_0_10 sub : getSubscriptions(session))
{
((ServerSession)session).unregister(sub);
@@ -1279,9 +1241,4 @@ public class ServerSessionDelegate exten
return ((ServerSession)session).getSubscriptions();
}
- private void setThreadSubject(Session session)
- {
- final ServerConnection scon = (ServerConnection) session.getConnection();
- SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Oct 21 01:19:00 2011
@@ -50,11 +50,6 @@ public class AutoCommitTransaction imple
_transactionLog = transactionLog;
}
- public long getTransactionStartTime()
- {
- return 0L;
- }
-
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Oct 21 01:19:00 2011
@@ -20,23 +20,18 @@ package org.apache.qpid.server.txn;
*
*/
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -46,28 +41,17 @@ import org.slf4j.LoggerFactory;
*/
public class LocalTransaction implements ServerTransaction
{
- protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
+ protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile TransactionLog.Transaction _transaction;
private TransactionLog _transactionLog;
- private long _txnStartTime = 0L;
public LocalTransaction(TransactionLog transactionLog)
{
_transactionLog = transactionLog;
}
-
- public boolean inTransaction()
- {
- return _transaction != null;
- }
-
- public long getTransactionStartTime()
- {
- return _txnStartTime;
- }
public void addPostTransactionAction(Action postTransactionAction)
{
@@ -105,6 +89,7 @@ public class LocalTransaction implements
try
{
+
for(QueueEntry entry : queueEntries)
{
ServerMessage message = entry.getMessage();
@@ -128,6 +113,7 @@ public class LocalTransaction implements
_logger.error("Error during message dequeues", e);
tidyUpOnError(e);
}
+
}
private void tidyUpOnError(Exception e)
@@ -154,7 +140,8 @@ public class LocalTransaction implements
}
finally
{
- resetDetails();
+ _transaction = null;
+ _postTransactionActions.clear();
}
}
@@ -206,11 +193,6 @@ public class LocalTransaction implements
{
_postTransactionActions.add(postTransactionAction);
- if (_txnStartTime == 0L)
- {
- _txnStartTime = System.currentTimeMillis();
- }
-
if(message.isPersistent())
{
try
@@ -266,14 +248,17 @@ public class LocalTransaction implements
}
finally
{
- resetDetails();
+ _transaction = null;
+ _postTransactionActions.clear();
}
+
}
public void rollback()
{
try
{
+
if(_transaction != null)
{
_transaction.abortTran();
@@ -295,15 +280,9 @@ public class LocalTransaction implements
}
finally
{
- resetDetails();
+ _transaction = null;
+ _postTransactionActions.clear();
}
}
}
-
- private void resetDetails()
- {
- _transaction = null;
- _postTransactionActions.clear();
- _txnStartTime = 0L;
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Fri Oct 21 01:19:00 2011
@@ -52,13 +52,6 @@ public interface ServerTransaction
public void onRollback();
}
- /**
- * Return the time the current transaction started.
- *
- * @return the time this transaction started or 0 if not in a transaction
- */
- long getTransactionStartTime();
-
/**
* Register an Action for execution after transaction commit or rollback. Actions
* will be executed in the order in which they are registered.
Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,3 +1 @@
/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
-/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1072051-1185907
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Fri Oct 21 01:19:00 2011
@@ -63,10 +63,6 @@ public abstract class HouseKeepingTask i
{
_logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
}
- finally
- {
- CurrentActor.remove();
- }
}
public VirtualHost getVirtualHost()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Oct 21 01:19:00 2011
@@ -20,28 +20,30 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.UUID;
-
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.binding.BindingFactory;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.TimerTask;
+import java.util.concurrent.FutureTask;
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
{
IConnectionRegistry getConnectionRegistry();
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Oct 21 01:19:00 2011
@@ -43,10 +43,7 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.util.ByteBufferInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -239,14 +236,7 @@ public class VirtualHostConfigRecoveryHa
FieldTable argumentsFT = null;
if(buf != null)
{
- try
- {
- argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit());
- }
- catch (IOException e)
- {
- throw new RuntimeException("IOException should not be thrown here", e);
- }
+ argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
}
BindingFactory bf = _virtualHost.getBindingFactory();
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Fri Oct 21 01:19:00 2011
@@ -20,21 +20,23 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -61,8 +63,6 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -71,7 +71,7 @@ import org.apache.qpid.server.registry.A
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
@@ -99,7 +99,7 @@ public class VirtualHostImpl implements
private AMQBrokerManagerMBean _brokerMBean;
- private final AuthenticationManager _authenticationManager;
+ private AuthenticationManager _authenticationManager;
private SecurityManager _securityManager;
@@ -111,8 +111,6 @@ public class VirtualHostImpl implements
private BrokerConfig _broker;
private UUID _id;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _createTime = System.currentTimeMillis();
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -163,12 +161,12 @@ public class VirtualHostImpl implements
public String getObjectInstanceName()
{
- return ObjectName.quote(_name);
+ return _name.toString();
}
public String getName()
{
- return _name;
+ return _name.toString();
}
public VirtualHostImpl getVirtualHost()
@@ -177,11 +175,22 @@ public class VirtualHostImpl implements
}
}
- public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
+ {
+ this(appRegistry, hostConfig, null);
+ }
+
+
+ public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ {
+ this(ApplicationRegistry.getInstance(),hostConfig,store);
+ }
+
+ private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
{
if (hostConfig == null)
{
- throw new IllegalArgumentException("HostConfig cannot be null");
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
}
_appRegistry = appRegistry;
@@ -235,28 +244,21 @@ public class VirtualHostImpl implements
initialiseMessageStore(hostConfig);
}
- _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration);
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
- initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod());
-
- initialiseStatistics();
+ initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
}
- /**
- * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
- * and checking for idle or open transactions that have exceeded the permitted thresholds.
- *
- * @param period
- */
private void initialiseHouseKeeping(long period)
{
+ /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
if (period != 0L)
{
- class VirtualHostHouseKeepingTask extends HouseKeepingTask
+ class ExpiredMessagesTask extends HouseKeepingTask
{
- public VirtualHostHouseKeepingTask(VirtualHost vhost)
+ public ExpiredMessagesTask(VirtualHost vhost)
{
super(vhost);
}
@@ -279,29 +281,18 @@ public class VirtualHostImpl implements
// house keeping task from running.
}
}
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
- {
- _logger.debug("Checking for long running open transactions on connection " + connection);
- for (AMQSessionModel session : connection.getSessionModels())
- {
- _logger.debug("Checking for long running open transactions on session " + session);
- try
- {
- session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
- _configuration.getTransactionTimeoutOpenClose(),
- _configuration.getTransactionTimeoutIdleWarn(),
- _configuration.getTransactionTimeoutIdleClose());
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
- }
- }
- }
}
}
- scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this));
+ scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+
+ class ForceChannelClosuresTask extends TimerTask
+ {
+ public void run()
+ {
+ _connectionRegistry.expireClosedChannels();
+ }
+ }
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -455,57 +446,46 @@ public class VirtualHostImpl implements
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
- String queueName = queue.getName();
if (queue.isDurable())
{
getDurableConfigurationStore().createQueue(queue);
}
- //get the exchange name (returns default exchange name if none was specified)
String exchangeName = queueConfiguration.getExchange();
- Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
+
+ if (exchange == null)
+ {
+ exchange = _exchangeRegistry.getDefaultExchange();
+ }
+
if (exchange == null)
{
- throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
}
- Exchange defaultExchange = _exchangeRegistry.getDefaultExchange();
-
- //get routing keys in configuration (returns empty list if none are defined)
- List<?> routingKeys = queueConfiguration.getRoutingKeys();
+ List routingKeys = queueConfiguration.getRoutingKeys();
+ if (routingKeys == null || routingKeys.isEmpty())
+ {
+ routingKeys = Collections.singletonList(queue.getNameShortString());
+ }
for (Object routingKeyNameObj : routingKeys)
{
- String routingKey = String.valueOf(routingKeyNameObj);
-
- if (exchange.equals(defaultExchange) && !queueName.equals(routingKey))
+ AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+ if (_logger.isInfoEnabled())
{
- throw new ConfigurationException("Illegal attempt to bind queue '" + queueName +
- "' to the default exchange with a key other than the queue name: " + routingKey);
+ _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
}
-
- configureBinding(queue, exchange, routingKey);
- }
-
- if (!exchange.equals(defaultExchange))
- {
- //bind the queue to the named exchange using its name
- configureBinding(queue, exchange, queueName);
+ _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
}
- //ensure the queue is bound to the default exchange using its name
- configureBinding(queue, defaultExchange, queueName);
- }
-
- private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException
- {
- if (_logger.isInfoEnabled())
+ if (exchange != _exchangeRegistry.getDefaultExchange())
{
- _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName());
+ _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
}
- _bindingFactory.addBinding(routingKey, queue, exchange, null);
}
public String getName()
@@ -647,80 +627,6 @@ public class VirtualHostImpl implements
{
return _bindingFactory;
}
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- _appRegistry.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- _appRegistry.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (AMQConnectionModel connection : _connectionRegistry.getConnections())
- {
- connection.resetStatistics();
- }
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
- _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
- _messagesReceived = new StatisticsCounter("messages-received-" + getName());
- _dataReceived = new StatisticsCounter("bytes-received-" + getName());
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
public void createBrokerConnection(final String transport,
final String host,
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Fri Oct 21 01:19:00 2011
@@ -192,7 +192,7 @@ public class MessageStoreTool
if (_initialised)
{
- ApplicationRegistry.remove();
+ ApplicationRegistry.remove(1);
}
_console.println("...exiting");
@@ -274,7 +274,7 @@ public class MessageStoreTool
{
ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
- ApplicationRegistry.remove();
+ ApplicationRegistry.remove(1);
ApplicationRegistry.initialise(registry);
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Fri Oct 21 01:19:00 2011
@@ -364,7 +364,7 @@ public class Show extends AbstractComman
{
if(msg instanceof AMQMessage)
{
- headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties());
+ headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
}
}
catch (AMQException e)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org