You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [8/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker/...
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Aug 14 20:40:49 2008
@@ -1,173 +1,184 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.log4j.Logger;
-
-import java.util.Set;
-import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReference;
-
+import org.apache.qpid.server.subscription.Subscription;
-public class QueueEntry
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface QueueEntry extends Comparable<QueueEntry>
{
- /**
- * Used for debugging purposes.
- */
- private static final Logger _log = Logger.getLogger(QueueEntry.class);
-
- private final AMQQueue _queue;
- private final AMQMessage _message;
- private Set<Subscription> _rejectedBy = null;
- private AtomicReference<Object> _owner = new AtomicReference<Object>();
-
-
- public QueueEntry(AMQQueue queue, AMQMessage message)
+ public static enum State
{
- _queue = queue;
- _message = message;
+ AVAILABLE,
+ ACQUIRED,
+ EXPIRED,
+ DEQUEUED,
+ DELETED
}
-
- public AMQQueue getQueue()
+ public static interface StateChangeListener
{
- return _queue;
+ public void stateChanged(QueueEntry entry, State oldSate, State newState);
}
- public AMQMessage getMessage()
+ public abstract class EntryState
{
- return _message;
- }
+ private EntryState()
+ {
+ }
- public long getSize()
- {
- return getMessage().getSize();
+ public abstract State getState();
}
- public boolean getDeliveredToConsumer()
- {
- return getMessage().getDeliveredToConsumer();
- }
- public boolean expired() throws AMQException
+ public final class AvailableState extends EntryState
{
- return getMessage().expired(_queue);
- }
- public boolean isTaken()
- {
- return _owner.get() != null;
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
}
- public boolean taken(Subscription sub)
- {
- return !(_owner.compareAndSet(null, sub == null ? this : sub));
- }
- public void setDeliveredToConsumer()
+ public final class DequeuedState extends EntryState
{
- getMessage().setDeliveredToConsumer();
- }
- public void release()
- {
- _owner.set(null);
+ public State getState()
+ {
+ return State.DEQUEUED;
+ }
}
- public String debugIdentity()
- {
- return getMessage().debugIdentity();
- }
- public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+ public final class DeletedState extends EntryState
{
- _queue.process(storeContext, this, deliverFirst);
- }
- public void checkDeliveredToConsumer() throws NoConsumersException
- {
- _message.checkDeliveredToConsumer();
+ public State getState()
+ {
+ return State.DELETED;
+ }
}
- public void setRedelivered(boolean b)
+ public final class ExpiredState extends EntryState
{
- getMessage().setRedelivered(b);
- }
- public Subscription getDeliveredSubscription()
- {
- synchronized (this)
+ public State getState()
{
- Object owner = _owner.get();
- if (owner instanceof Subscription)
- {
- return (Subscription) owner;
- }
- else
- {
- return null;
- }
+ return State.EXPIRED;
}
}
- public void reject()
+
+ public final class NonSubscriptionAcquiredState extends EntryState
{
- reject(getDeliveredSubscription());
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
}
- public void reject(Subscription subscription)
+ public final class SubscriptionAcquiredState extends EntryState
{
- if (subscription != null)
- {
- if (_rejectedBy == null)
- {
- _rejectedBy = new HashSet<Subscription>();
- }
+ private final Subscription _subscription;
- _rejectedBy.add(subscription);
- }
- else
+ public SubscriptionAcquiredState(Subscription subscription)
{
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ _subscription = subscription;
}
- }
- public boolean isRejectedBy(Subscription subscription)
- {
- boolean rejected = _rejectedBy != null;
- if (rejected) // We have subscriptions that rejected this message
+ public State getState()
{
- return _rejectedBy.contains(subscription);
+ return State.ACQUIRED;
}
- else // This messasge hasn't been rejected yet.
+
+ public Subscription getSubscription()
{
- return rejected;
+ return _subscription;
}
}
+ final static EntryState AVAILABLE_STATE = new AvailableState();
+ final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState DEQUEUED_STATE = new DequeuedState();
+ final static EntryState EXPIRED_STATE = new ExpiredState();
+ final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+
+
+
+
+ AMQQueue getQueue();
+
+ AMQMessage getMessage();
+
+ long getSize();
+
+ boolean getDeliveredToConsumer();
+
+ boolean expired() throws AMQException;
+
+ boolean isAcquired();
+
+ boolean acquire();
+ boolean acquire(Subscription sub);
+
+ boolean delete();
+ boolean isDeleted();
+
+ boolean acquiredBySubscription();
+
+ void setDeliveredToSubscription();
+
+ void release();
+
+ String debugIdentity();
+
+ boolean immediateAndNotDelivered();
+
+ void setRedelivered(boolean b);
+
+ Subscription getDeliveredSubscription();
+
+ void reject();
+
+ void reject(Subscription subscription);
+
+ boolean isRejectedBy(Subscription subscription);
+
+ void requeue(StoreContext storeContext) throws AMQException;
+
+ void dequeue(final StoreContext storeContext) throws FailedDequeueException;
+
+ void dispose(final StoreContext storeContext) throws MessageCleanupException;
+
+ void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+
+ boolean isQueueDeleted();
+
+ void addStateChangeListener(StateChangeListener listener);
+ boolean removeStateChangeListener(StateChangeListener listener);
}
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
('svn:eol-style' removed)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java Thu Aug 14 20:40:49 2008
@@ -1,27 +1,27 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-public interface QueueNotificationListener
-{
- void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+
+public interface QueueNotificationListener
+{
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
('svn:eol-style' removed)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Thu Aug 14 20:40:49 2008
@@ -26,7 +26,6 @@
import java.util.List;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -48,29 +47,35 @@
private final MessageStore _messageStore;
+ private final Long _messageId;
private long _arrivalTime;
-
- public WeakReferenceMessageHandle(MessageStore messageStore)
+ public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore)
{
+ _messageId = messageId;
_messageStore = messageStore;
}
- public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
{
ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
if (chb == null)
{
- MessageMetaData mmd = loadMessageMetaData(context, messageId);
+ MessageMetaData mmd = loadMessageMetaData(context);
chb = mmd.getContentHeaderBody();
}
return chb;
}
- private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId)
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
+ private MessageMetaData loadMessageMetaData(StoreContext context)
throws AMQException
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
populateFromMessageMetaData(mmd);
return mmd;
}
@@ -82,11 +87,11 @@
_messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
}
- public int getBodyCount(StoreContext context, Long messageId) throws AMQException
+ public int getBodyCount(StoreContext context) throws AMQException
{
if (_contentBodies == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
int chunkCount = mmd.getContentChunkCount();
_contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
for (int i = 0; i < chunkCount; i++)
@@ -97,12 +102,12 @@
return _contentBodies.size();
}
- public long getBodySize(StoreContext context, Long messageId) throws AMQException
+ public long getBodySize(StoreContext context) throws AMQException
{
- return getContentHeaderBody(context, messageId).bodySize;
+ return getContentHeaderBody(context).bodySize;
}
- public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -113,7 +118,7 @@
ContentChunk cb = wr.get();
if (cb == null)
{
- cb = _messageStore.getContentBodyChunk(context, messageId, index);
+ cb = _messageStore.getContentBodyChunk(context, _messageId, index);
_contentBodies.set(index, new WeakReference<ContentChunk>(cb));
}
return cb;
@@ -123,12 +128,11 @@
* Content bodies are set <i>before</i> the publish and header frames
*
* @param storeContext
- * @param messageId
* @param contentChunk
* @param isLastContentBody
* @throws AMQException
*/
- public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
{
if (_contentBodies == null && isLastContentBody)
{
@@ -142,16 +146,16 @@
}
}
_contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
- _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1,
+ _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
contentChunk, isLastContentBody);
}
- public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
{
MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
if (bpb == null)
{
- MessageMetaData mmd = loadMessageMetaData(context, messageId);
+ MessageMetaData mmd = loadMessageMetaData(context);
bpb = mmd.getMessagePublishInfo();
}
@@ -168,12 +172,9 @@
_redelivered = redelivered;
}
- public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
+ public boolean isPersistent()
{
- //todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(context, messageId);
- return chb.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ return true;
}
/**
@@ -183,7 +184,7 @@
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
@@ -199,24 +200,15 @@
MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
- _messageStore.storeMessageMetaData(storeContext, messageId, mmd);
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
- populateFromMessageMetaData(mmd);
- }
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
- {
- _messageStore.removeMessage(storeContext, messageId);
- }
-
- public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
- {
- _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
+ populateFromMessageMetaData(mmd);
}
- public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
+ public void removeMessage(StoreContext storeContext) throws AMQException
{
- _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
+ _messageStore.removeMessage(storeContext, _messageId);
}
public long getArrivalTime()
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -24,9 +24,17 @@
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.mina.common.IoAcceptor;
import java.util.HashMap;
import java.util.Map;
+import java.net.InetSocketAddress;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -36,7 +44,7 @@
*/
public abstract class ApplicationRegistry implements IApplicationRegistry
{
- private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
+ protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
@@ -48,6 +56,20 @@
public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry";
public static String _APPLICATION_REGISTRY = DEFAULT_APPLICATION_REGISTRY;
+ protected final Map<InetSocketAddress, IoAcceptor> _acceptors = new HashMap<InetSocketAddress, IoAcceptor>();
+
+ protected ManagedObjectRegistry _managedObjectRegistry;
+
+ protected AuthenticationManager _authenticationManager;
+
+ protected VirtualHostRegistry _virtualHostRegistry;
+
+ protected ACLPlugin _accessManager;
+
+ protected PrincipalDatabaseManager _databaseManager;
+
+ protected PluginManager _pluginManager;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -57,7 +79,6 @@
{
public void run()
{
- _logger.info("Shutting down application registries...");
removeAll();
}
}
@@ -90,16 +111,29 @@
}
}
+ /**
+ * Method to cleanly shutdown specified registry running in this JVM
+ *
+ * @param instanceID the instance to shutdown
+ */
+
public static void remove(int instanceID)
{
try
{
- _instanceMap.get(instanceID).close();
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
+ if (instance != null)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Shuting down ApplicationRegistry(" + instanceID + "):" + instance);
+ }
+ instance.close();
+ }
}
catch (Exception e)
{
- _logger.error("Error shutting down message store: " + e, e);
-
+ _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e);
}
finally
{
@@ -107,6 +141,7 @@
}
}
+ /** Method to cleanly shutdown all registries currently running in this JVM */
public static void removeAll()
{
Object[] keys = _instanceMap.keySet().toArray();
@@ -158,15 +193,36 @@
public void close() throws Exception
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Shutting down ApplicationRegistry:"+this);
+ }
+
+ //Stop incomming connections
+ unbind();
+
+ //Shutdown virtualhosts
for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
{
virtualHost.close();
}
// close the rmi registry(if any) started for management
- if (getInstance().getManagedObjectRegistry() != null)
+ if (getManagedObjectRegistry() != null)
+ {
+ getManagedObjectRegistry().close();
+ }
+ }
+
+ private void unbind()
+ {
+ synchronized (_acceptors)
{
- getInstance().getManagedObjectRegistry().close();
+ for (InetSocketAddress bindAddress : _acceptors.keySet())
+ {
+ IoAcceptor acceptor = _acceptors.get(bindAddress);
+ acceptor.unbind(bindAddress);
+ }
}
}
@@ -175,6 +231,14 @@
return _configuration;
}
+ public void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor)
+ {
+ synchronized (_acceptors)
+ {
+ _acceptors.put(bindAddress, acceptor);
+ }
+ }
+
public <T> T getConfiguredObject(Class<T> instanceType)
{
T instance = (T) _configuredObjects.get(instanceType);
@@ -195,9 +259,39 @@
return instance;
}
-
public static void setDefaultApplicationRegistry(String clazz)
{
_APPLICATION_REGISTRY = clazz;
}
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return _virtualHostRegistry;
+ }
+
+ public ACLPlugin getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public PrincipalDatabaseManager getDatabaseManager()
+ {
+ return _databaseManager;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public PluginManager getPluginManager()
+ {
+ return _pluginManager;
+ }
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -48,23 +48,6 @@
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private AuthenticationManager _authenticationManager;
-
- private ACLPlugin _accessManager;
-
- private PrincipalDatabaseManager _databaseManager;
-
- private VirtualHostRegistry _virtualHostRegistry;
-
- private PluginManager _pluginManager;
-
-
- public ConfigurationFileApplicationRegistry(Configuration configuration)
- {
- super(configuration);
- }
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
@@ -81,7 +64,7 @@
}
}
- public static final Configuration config(File url) throws ConfigurationException
+ private static final Configuration config(File url) throws ConfigurationException
{
// We have to override the interpolate methods so that
// interpolation takes place accross the entirety of the
@@ -150,39 +133,9 @@
}
}
-
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _virtualHostRegistry;
- }
-
- public ACLPlugin getAccessManager()
- {
- return _accessManager;
- }
-
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public PrincipalDatabaseManager getDatabaseManager()
- {
- return _databaseManager;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
public Collection<String> getVirtualHostNames()
{
return getConfiguration().getList("virtualhosts.virtualhost.name");
}
- public PluginManager getPluginManager()
- {
- return _pluginManager;
- }
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.server.registry;
import java.util.Collection;
+import java.net.InetSocketAddress;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.ManagedObjectRegistry;
@@ -29,6 +30,7 @@
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.IoAcceptor;
public interface IApplicationRegistry
{
@@ -39,6 +41,10 @@
*/
void initialise() throws Exception;
+ /**
+ * Shutdown this Registry
+ * @throws Exception - //fixme needs to be made more specific
+ */
void close() throws Exception;
/**
@@ -71,5 +77,12 @@
ACLPlugin getAccessManager();
PluginManager getPluginManager();
-
+
+ /**
+ * Register any acceptors for this registry
+ * @param bindAddress The address that the acceptor has been bound with
+ * @param acceptor The acceptor in use
+ */
+ void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor);
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java Thu Aug 14 20:40:49 2008
@@ -27,23 +27,23 @@
GRANTED, REFUSED
}
- StringBuilder _authorizer;
- AccessStatus _status;
+ private String _authorizer;
+ private AccessStatus _status;
public AccessResult(ACLPlugin authorizer, AccessStatus status)
{
_status = status;
- _authorizer = new StringBuilder(authorizer.getPluginName());
+ _authorizer = authorizer.getPluginName();
}
public void setAuthorizer(ACLPlugin authorizer)
{
- _authorizer.append(authorizer.getPluginName());
+ _authorizer += authorizer.getPluginName();
}
public String getAuthorizer()
{
- return _authorizer.toString();
+ return _authorizer;
}
public void setStatus(AccessStatus status)
@@ -58,8 +58,7 @@
public void addAuthorizer(ACLPlugin accessManager)
{
- _authorizer.insert(0, "->");
- _authorizer.insert(0, accessManager.getPluginName());
+ _authorizer = accessManager.getPluginName() + "->" + _authorizer;
}
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
('svn:eol-style' removed)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java Thu Aug 14 20:40:49 2008
@@ -28,13 +28,9 @@
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.access.Permission;
import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
public class AllowAll implements ACLPlugin
{
-
- private static final Logger _logger = ACLManager.getLogger();
-
public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters)
{
if (ACLManager.getLogger().isDebugEnabled())
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Thu Aug 14 20:40:49 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.security.auth;
+import javax.security.sasl.SaslException;
+
public class AuthenticationResult
{
public enum AuthenticationStatus
@@ -29,15 +31,33 @@
public AuthenticationStatus status;
public byte[] challenge;
+
+ private Exception cause;
+
+ public AuthenticationResult(AuthenticationStatus status)
+ {
+ this(null, status, null);
+ }
public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
{
+ this(challenge, status, null);
+ }
+
+ public AuthenticationResult(AuthenticationStatus error, Exception cause)
+ {
+ this(null, error, cause);
+ }
+
+ public AuthenticationResult(byte[] challenge, AuthenticationStatus status, Exception cause)
+ {
this.status = status;
this.challenge = challenge;
+ this.cause = cause;
}
- public AuthenticationResult(AuthenticationStatus status)
+ public Exception getCause()
{
- this.status = status;
+ return cause;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Thu Aug 14 20:40:49 2008
@@ -230,12 +230,7 @@
}
catch (SaslException e)
{
- return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
}
}
-
- public AuthenticationResult isAuthorize(VirtualHost vhost, String username)
- {
- return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
- }
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Thu Aug 14 20:40:49 2008
@@ -68,12 +68,15 @@
PasswordCallback passwordCb = new PasswordCallback("prompt", false);
// TODO: should not get pwd as a String but as a char array...
String pwd = (String) ft.getString("PASSWORD");
- passwordCb.setPassword(pwd.toCharArray());
AuthorizeCallback authzCb = new AuthorizeCallback(username, username);
Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
_cbh.handle(callbacks);
- _complete = true;
- if (authzCb.isAuthorized())
+ String storedPwd = new String(passwordCb.getPassword());
+ if (storedPwd.equals(pwd))
+ {
+ _complete = true;
+ }
+ if (authzCb.isAuthorized() && _complete)
{
_authorizationId = authzCb.getAuthenticationID();
return null;
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java Thu Aug 14 20:40:49 2008
@@ -72,17 +72,19 @@
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", authzid);
- // we do not care about the prompt but it throws if null
PasswordCallback passwordCb = new PasswordCallback("prompt", false);
// TODO: should not get pwd as a String but as a char array...
int passwordLen = response.length - authcidNullPosition - 1;
String pwd = new String(response, authcidNullPosition + 1, passwordLen, "utf8");
- passwordCb.setPassword(pwd.toCharArray());
AuthorizeCallback authzCb = new AuthorizeCallback(authzid, authzid);
Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
_cbh.handle(callbacks);
- _complete = true;
- if (authzCb.isAuthorized())
+ String storedPwd = new String(passwordCb.getPassword());
+ if (storedPwd.equals(pwd))
+ {
+ _complete = true;
+ }
+ if (authzCb.isAuthorized() && _complete)
{
_authorizationId = authzCb.getAuthenticationID();
return null;
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Aug 14 20:40:49 2008
@@ -26,9 +26,9 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -121,22 +121,28 @@
}
+
public void createQueue(AMQQueue queue) throws AMQException
{
+ // Not requred to do anything
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
// Not required to do anything
}
- public void removeQueue(AMQShortString name) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
// Not required to do anything
}
- public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
// Not required to do anything
}
@@ -213,7 +219,12 @@
return bodyList.get(index);
}
- private void checkNotClosed() throws MessageStoreClosedException
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ private void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
{
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Aug 14 20:40:49 2008
@@ -27,8 +27,8 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
/**
@@ -136,35 +136,42 @@
void createQueue(AMQQueue queue) throws AMQException;
/**
- * Removes the specified queue from the persistent store.
+ * Makes the specified queue persistent.
*
- * @param name The queue to remove.
+ * @param queue The queue to store.
*
+ * @param arguments The additional arguments to the binding
* @throws AMQException If the operation fails for any reason.
*/
- void removeQueue(AMQShortString name) throws AMQException;
+ void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
+
+ /**
+ * Removes the specified queue from the persistent store.
+ *
+ * @param queue The queue to remove.
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void removeQueue(final AMQQueue queue) throws AMQException;
/**
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to place the message on.
+ * @param queue The queue to place the message on.
* @param messageId The message to enqueue.
- *
* @throws AMQException If the operation fails for any reason.
*/
- void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
/**
* Extracts a message from a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to take the message from.
+ * @param queue The queue to place the message on.
* @param messageId The message to dequeue.
- *
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+ void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
/**
* Begins a transactional context.
@@ -258,4 +265,12 @@
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+
+ /**
+ * Is this store capable of persisting the data
+ *
+ * @return true if this store is capable of persisting data
+ */
+ boolean isPersistent();
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Aug 14 20:40:49 2008
@@ -37,7 +37,7 @@
public StoreContext()
{
- _name = super.toString();
+ _name = "StoreContext";
}
public StoreContext(String name)
@@ -52,7 +52,10 @@
public void setPayload(Object payload)
{
- _logger.debug("public void setPayload(Object payload = " + payload + "): called");
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("public void setPayload(Object payload = " + payload + "): called");
+ }
_payload = payload;
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java Thu Aug 14 20:40:49 2008
@@ -97,6 +97,10 @@
defaultValue = "false")
public boolean _multiThreadNIO;
+ @Configured(path = "advanced.useWriteBiasedPool",
+ defaultValue = "false")
+ public boolean useBiasedWrites;
+
public IoAcceptor createAcceptor()
{
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Thu Aug 14 20:40:49 2008
@@ -24,18 +24,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
/** A transactional context that only supports local transactions. */
public class LocalTransactionalContext implements TransactionalContext
@@ -44,7 +42,7 @@
private final TxnBuffer _txnBuffer = new TxnBuffer();
- private final List<DeliveryDetails> _postCommitDeliveryList = new LinkedList<DeliveryDetails>();
+ private final List<DeliveryAction> _postCommitDeliveryList = new ArrayList<DeliveryAction>();
/**
* We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
@@ -52,81 +50,120 @@
*/
private TxAck _ackOp;
- private List<RequiredDeliveryException> _returnMessages;
-
- private final MessageStore _messageStore;
-
- private final StoreContext _storeContext;
-
private boolean _inTran = false;
/** Are there messages to deliver. NOT Has the message been delivered */
private boolean _messageDelivered = false;
+ private final AMQChannel _channel;
+
- private static class DeliveryDetails
+ private abstract class DeliveryAction
{
- public QueueEntry entry;
- private boolean deliverFirst;
+ abstract public void process() throws AMQException;
+
+ }
+
+ private class RequeueAction extends DeliveryAction
+ {
+ public QueueEntry entry;
- public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
+ public RequeueAction(QueueEntry entry)
{
this.entry = entry;
- this.deliverFirst = deliverFirst;
+ }
+
+ public void process() throws AMQException
+ {
+ entry.requeue(getStoreContext());
}
}
- public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
- List<RequiredDeliveryException> returnMessages)
+ private class PublishAction extends DeliveryAction
{
- _messageStore = messageStore;
- _storeContext = storeContext;
- _returnMessages = returnMessages;
- // _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ public PublishAction(final AMQQueue queue, final AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+ public void process() throws AMQException
+ {
+
+ _message.incrementReference();
+ try
+ {
+ QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+
+ if(entry.immediateAndNotDelivered())
+ {
+ getReturnMessages().add(new NoConsumersException(_message));
+ }
+ }
+ finally
+ {
+ _message.decrementReference(getStoreContext());
+ }
+ }
+ }
+
+ public LocalTransactionalContext(final AMQChannel channel)
+ {
+ _channel = channel;
}
public StoreContext getStoreContext()
{
- return _storeContext;
+ return _channel.getStoreContext();
}
+ public List<RequiredDeliveryException> getReturnMessages()
+ {
+ return _channel.getReturnMessages();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _channel.getMessageStore();
+ }
+
+
public void rollback() throws AMQException
{
- _txnBuffer.rollback(_storeContext);
+ _txnBuffer.rollback(getStoreContext());
// Hack to deal with uncommitted non-transactional writes
- if (_messageStore.inTran(_storeContext))
+ if (getMessageStore().inTran(getStoreContext()))
{
- _messageStore.abortTran(_storeContext);
+ getMessageStore().abortTran(getStoreContext());
_inTran = false;
}
_postCommitDeliveryList.clear();
}
- public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
+ public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
// Following that (and ordering is important), an op will
// be added for every queue onto which the message is
- // enqueued. Finally a cleanup op will be added to decrement
- // the reference associated with the routing.
- // message.incrementReference();
- _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
+ // enqueued.
+ _postCommitDeliveryList.add(new PublishAction(queue, message));
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
- /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
- if (_log.isDebugEnabled())
- {
- _log.debug("Incrementing ref count on message and enlisting cleanup operation - id " +
- message.getMessageId());
- }
- message.incrementReference();
+
+ }
+
+ public void requeue(QueueEntry entry) throws AMQException
+ {
+ _postCommitDeliveryList.add(new RequeueAction(entry));
_messageDelivered = true;
- */
}
+
private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
@@ -147,10 +184,8 @@
// as new acks come in. If this is the first ack in the txn
// we will need to create and enlist the op.
if (_ackOp == null)
- {
-
+ {
_ackOp = new TxAck(unacknowledgedMessageMap);
-
_txnBuffer.enlist(_ackOp);
}
// update the op to include this ack request
@@ -189,7 +224,7 @@
_log.debug("Starting transaction on message store: " + this);
}
- _messageStore.beginTran(_storeContext);
+ getMessageStore().beginTran(getStoreContext());
_inTran = true;
}
}
@@ -212,22 +247,22 @@
if (_messageDelivered && _inTran)
{
- _txnBuffer.enlist(new StoreMessageOperation(_messageStore));
+ _txnBuffer.enlist(new StoreMessageOperation(getMessageStore()));
}
// fixme fail commit here ... QPID-440
try
{
- _txnBuffer.commit(_storeContext);
+ _txnBuffer.commit(getStoreContext());
}
finally
{
_messageDelivered = false;
- _inTran = _messageStore.inTran(_storeContext);
+ _inTran = getMessageStore().inTran(getStoreContext());
}
try
{
- postCommitDelivery(_returnMessages);
+ postCommitDelivery();
}
catch (AMQException e)
{
@@ -236,7 +271,7 @@
}
}
- private void postCommitDelivery(List<RequiredDeliveryException> returnMessages) throws AMQException
+ private void postCommitDelivery() throws AMQException
{
if (_log.isDebugEnabled())
{
@@ -245,18 +280,9 @@
try
{
- for (DeliveryDetails dd : _postCommitDeliveryList)
+ for (DeliveryAction dd : _postCommitDeliveryList)
{
- dd.entry.process(_storeContext, dd.deliverFirst);
-
- try
- {
- dd.entry.checkDeliveredToConsumer();
- }
- catch (NoConsumersException nce)
- {
- returnMessages.add(nce);
- }
+ dd.process();
}
}
finally
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Aug 14 20:40:49 2008
@@ -22,19 +22,14 @@
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -49,6 +44,8 @@
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
+
+
private final MessageStore _messageStore;
private final StoreContext _storeContext;
@@ -57,12 +54,6 @@
private boolean _inTran;
public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
- List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
- {
- this(messageStore,storeContext,channel,returnMessages);
- }
-
- public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
List<RequiredDeliveryException> returnMessages)
{
_channel = channel;
@@ -97,19 +88,22 @@
// Does not apply to this context
}
- public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
+ public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
- try
+ QueueEntry entry = queue.enqueue(_storeContext, message);
+
+ //following check implements the functionality
+ //required by the 'immediate' flag:
+ if(entry.immediateAndNotDelivered())
{
- entry.process(_storeContext, deliverFirst);
- //following check implements the functionality
- //required by the 'immediate' flag:
- entry.checkDeliveredToConsumer();
- }
- catch (NoConsumersException e)
- {
- _returnMessages.add(e);
+ _returnMessages.add(new NoConsumersException(entry.getMessage()));
}
+
+ }
+
+ public void requeue(QueueEntry entry) throws AMQException
+ {
+ entry.requeue(_storeContext);
}
public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
@@ -118,7 +112,7 @@
{
final boolean debug = _log.isDebugEnabled();
-
+ ;
if (multiple)
{
if (deliveryTag == 0)
@@ -130,7 +124,7 @@
unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
if (debug)
{
@@ -159,28 +153,13 @@
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
- LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (UnacknowledgedMessage msg : acked)
- {
- if (debug)
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
- if(msg.getMessage().isPersistent())
- {
- beginTranIfNecessary();
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
- UnacknowledgedMessage msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ QueueEntry msg;
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -202,19 +181,20 @@
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
+ unacknowledgedMessageMap.remove(deliveryTag);
+
+
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
msg.getMessage().getMessageId());
}
}
-
if(_inTran)
{
_messageStore.commitTran(_storeContext);
_inTran = false;
}
-
}
public void messageFullyReceived(boolean persistent) throws AMQException
@@ -228,6 +208,6 @@
public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
{
- _channel.processReturns(protocolSession);
+ _channel.processReturns();
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Thu Aug 14 20:40:49 2008
@@ -25,6 +25,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -106,18 +107,26 @@
void rollback() throws AMQException;
/**
- * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a
- * redelivery, and should be placed on the front of the queue.
+ * Delivers the specified message to the specified queue.
*
* <p/>This is an 'enqueue' operation.
*
- * @param entry The message to deliver, and the queue to deliver to.
- * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
- * for normal FIFO message ordering.
- *
+ * @param queue
+ * @param message The message to deliver
* @throws AMQException If the message cannot be delivered for any reason.
*/
- void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
+ void deliver(final AMQQueue queue, AMQMessage message) throws AMQException;
+
+ /**
+ * Requeues the specified message entry (message queue pair)
+ *
+ *
+ * @param queueEntry The message,queue pair
+ *
+ * @throws AMQException If the message cannot be delivered for any reason.
+ */
+ void requeue(QueueEntry queueEntry) throws AMQException;
+
/**
* Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -42,19 +42,6 @@
public class NullApplicationRegistry extends ApplicationRegistry
{
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private AuthenticationManager _authenticationManager;
-
- private VirtualHostRegistry _virtualHostRegistry;
-
- private ACLPlugin _accessManager;
-
- private PrincipalDatabaseManager _databaseManager;
-
- private PluginManager _pluginManager;
-
-
public NullApplicationRegistry()
{
super(new MapConfiguration(new HashMap()));
@@ -62,6 +49,8 @@
public void initialise() throws Exception
{
+ _logger.info("Initialising NullApplicationRegistry");
+
_configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
Properties users = new Properties();
@@ -84,47 +73,11 @@
}
- public Configuration getConfiguration()
- {
- return _configuration;
- }
-
-
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public PrincipalDatabaseManager getDatabaseManager()
- {
- return _databaseManager;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
public Collection<String> getVirtualHostNames()
{
String[] hosts = {"test"};
return Arrays.asList(hosts);
}
-
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _virtualHostRegistry;
- }
-
- public ACLPlugin getAccessManager()
- {
- return _accessManager;
- }
-
- public PluginManager getPluginManager()
- {
- return _pluginManager;
- }
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java Thu Aug 14 20:40:49 2008
@@ -1,44 +1,44 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.io.IOException;
-
-import org.apache.qpid.server.management.MBeanAttribute;
-
-/**
- * The management interface exposed to allow management of an Exchange.
- * @version 0.1
- */
-public interface ManagedVirtualHost
-{
- static final String TYPE = "VirtualHost";
-
- /**
- * Returns the name of the managed virtualHost.
- * @return the name of the exchange.
- * @throws java.io.IOException
- */
- @MBeanAttribute(name="Name", description= TYPE + " Name")
- String getName() throws IOException;
-
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.io.IOException;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+ static final String TYPE = "VirtualHost";
+
+ /**
+ * Returns the name of the managed virtualHost.
+ * @return the name of the exchange.
+ * @throws java.io.IOException
+ */
+ @MBeanAttribute(name="Name", description= TYPE + " Name")
+ String getName() throws IOException;
+
+
+}
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
('svn:eol-style' removed)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Aug 14 20:40:49 2008
@@ -26,6 +26,8 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.connection.ConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
@@ -55,6 +57,8 @@
private final String _name;
+ private ConnectionRegistry _connectionRegistry;
+
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
@@ -71,10 +75,11 @@
private ACLPlugin _accessManager;
- private Timer _houseKeepingTimer;
-
+ private final Timer _houseKeepingTimer;
+
private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
-
+
+
public void setAccessableName(String name)
{
_logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -86,6 +91,10 @@
return _name;
}
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return _connectionRegistry;
+ }
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
@@ -140,12 +149,18 @@
public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
{
+ if (name == null || name.length() == 0)
+ {
+ throw new IllegalArgumentException("Illegal name (" + name + ") for virtualhost.");
+ }
+
_name = name;
_virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
+ _connectionRegistry = new ConnectionRegistry(this);
+
+ _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true);
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
_exchangeFactory.initialise(hostConfig);
@@ -172,25 +187,22 @@
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
-
- _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
-
initialiseHouseKeeping(hostConfig);
}
private void initialiseHouseKeeping(final Configuration hostConfig)
{
-
+
long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
-
+
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if (period != 0L)
+ if(period != 0L)
{
class RemoveExpiredMessagesTask extends TimerTask
{
public void run()
{
- for (AMQQueue q : _queueRegistry.getQueues())
+ for(AMQQueue q : _queueRegistry.getQueues())
{
try
@@ -199,7 +211,7 @@
}
catch (AMQException e)
{
- _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
throw new RuntimeException(e);
}
}
@@ -207,11 +219,11 @@
}
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
- period / 2,
- period);
+ period/2,
+ period);
}
}
-
+
private void initialiseMessageStore(Configuration config) throws Exception
{
String messageStoreClass = config.getString("store.class");
@@ -285,14 +297,20 @@
public ACLPlugin getAccessManager()
{
return _accessManager;
- }
+ }
public void close() throws Exception
{
+ //Stop Housekeeping
if (_houseKeepingTimer != null)
{
_houseKeepingTimer.cancel();
}
+
+ //Stop Connections
+ _connectionRegistry.close();
+
+ //Close MessageStore
if (_messageStore != null)
{
_messageStore.close();
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
('svn:eol-style' removed)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Thu Aug 14 20:40:49 2008
@@ -1,70 +1,70 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry
-{
- private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
-
-
- private String _defaultVirtualHostName;
-
- public synchronized void registerVirtualHost(VirtualHost host) throws Exception
- {
- if(_registry.containsKey(host.getName()))
- {
- throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
- }
- _registry.put(host.getName(),host);
- }
-
- public VirtualHost getVirtualHost(String name)
- {
- if(name == null || name.trim().length() == 0 )
- {
- name = getDefaultVirtualHostName();
- }
-
- return _registry.get(name);
- }
-
- private String getDefaultVirtualHostName()
- {
- return _defaultVirtualHostName;
- }
-
- public void setDefaultVirtualHostName(String defaultVirtualHostName)
- {
- _defaultVirtualHostName = defaultVirtualHostName;
- }
-
-
- public Collection<VirtualHost> getVirtualHosts()
- {
- return new ArrayList<VirtualHost>(_registry.values());
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+
+ private String _defaultVirtualHostName;
+
+ public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+ {
+ if(_registry.containsKey(host.getName()))
+ {
+ throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+ }
+ _registry.put(host.getName(),host);
+ }
+
+ public VirtualHost getVirtualHost(String name)
+ {
+ if(name == null || name.trim().length() == 0 )
+ {
+ name = getDefaultVirtualHostName();
+ }
+
+ return _registry.get(name);
+ }
+
+ private String getDefaultVirtualHostName()
+ {
+ return _defaultVirtualHostName;
+ }
+
+ public void setDefaultVirtualHostName(String defaultVirtualHostName)
+ {
+ _defaultVirtualHostName = defaultVirtualHostName;
+ }
+
+
+ public Collection<VirtualHost> getVirtualHosts()
+ {
+ return new ArrayList<VirtualHost>(_registry.values());
+ }
+}
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
('svn:eol-style' removed)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Thu Aug 14 20:40:49 2008
@@ -25,11 +25,11 @@
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.configuration.Configuration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.tools.messagestore.commands.Clear;
import org.apache.qpid.tools.messagestore.commands.Command;
import org.apache.qpid.tools.messagestore.commands.Copy;
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java Thu Aug 14 20:40:49 2008
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.tools.messagestore.commands;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.server.queue.AMQQueue;
public class Copy extends Move
{
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Thu Aug 14 20:40:49 2008
@@ -24,6 +24,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -255,7 +256,7 @@
String title, boolean routing, boolean headers, boolean messageHeaders)
{
List<QueueEntry> single = new LinkedList<QueueEntry>();
- single.add(new QueueEntry(null,msg));
+ single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE));
List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Thu Aug 14 20:40:49 2008
@@ -21,7 +21,7 @@
package org.apache.qpid.tools.messagestore.commands;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java Thu Aug 14 20:40:49 2008
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.tools.messagestore.commands;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.server.queue.AMQQueue;
public class Purge extends Move
{