You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC
svn commit: r619823 [9/19] - in /incubator/qpid/branches/thegreatmerge/qpid:
./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/
dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/
dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Fri Feb 8 02:09:37 2008
@@ -30,6 +30,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -64,14 +65,13 @@
private static class DeliveryDetails
{
- public AMQMessage message;
- public AMQQueue queue;
+ public QueueEntry entry;
+
private boolean deliverFirst;
- public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+ public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
{
- this.message = message;
- this.queue = queue;
+ this.entry = entry;
this.deliverFirst = deliverFirst;
}
}
@@ -103,7 +103,7 @@
_postCommitDeliveryList.clear();
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+ public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
{
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
@@ -112,9 +112,9 @@
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
// message.incrementReference();
- _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
+ _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
{
@@ -242,11 +242,11 @@
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
- dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+ dd.entry.process(_storeContext, dd.deliverFirst);
try
{
- dd.message.checkDeliveredToConsumer();
+ dd.entry.checkDeliveredToConsumer();
}
catch (NoConsumersException nce)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Fri Feb 8 02:09:37 2008
@@ -34,6 +34,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -92,20 +93,14 @@
// Does not apply to this context
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+ public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
{
try
{
- //DTX removed - deliverFirst is to do with the position on the Queue not enqueuing!!
- // This should be done in routingComplete
-// if( ! deliverFirst )
-// {
-// message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
-// }
- queue.process(_storeContext, message, deliverFirst);
+ entry.process(_storeContext, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
- message.checkDeliveredToConsumer();
+ entry.checkDeliveredToConsumer();
}
catch (NoConsumersException e)
{
@@ -134,7 +129,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + message.message.getMessageId());
+ _log.debug("Discarding message: " + message.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -168,7 +163,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -198,7 +193,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -212,7 +207,7 @@
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.message.getMessageId());
+ msg.getMessage().getMessageId());
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Fri Feb 8 02:09:37 2008
@@ -25,6 +25,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -111,14 +112,13 @@
*
* <p/>This is an 'enqueue' operation.
*
- * @param message The message to deliver.
- * @param queue The queue to deliver the message to.
+ * @param entry The message to deliver, and the queue to deliver to.
* @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
* for normal FIFO message ordering.
*
* @throws AMQException If the message cannot be delivered for any reason.
*/
- void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+ void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
/**
* Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Fri Feb 8 02:09:37 2008
@@ -29,6 +29,7 @@
import org.apache.commons.configuration.MapConfiguration;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
@@ -51,6 +52,8 @@
private PrincipalDatabaseManager _databaseManager;
+ private PluginManager _pluginManager;
+
public NullApplicationRegistry()
{
@@ -81,7 +84,7 @@
VirtualHost dummyHost = new VirtualHost("test", getConfiguration());
_virtualHostRegistry.registerVirtualHost(dummyHost);
_virtualHostRegistry.setDefaultVirtualHostName("test");
-
+ _pluginManager = new PluginManager("");
_configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
}
@@ -121,6 +124,11 @@
public AccessManager getAccessManager()
{
return _accessManager;
+ }
+
+ public PluginManager getPluginManager()
+ {
+ return _pluginManager;
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Feb 8 02:09:37 2008
@@ -1,289 +1,326 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
- private final String _name;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- private TransactionManager _transactionManager;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private AuthenticationManager _authenticationManager;
-
- private AccessManager _accessManager;
-
-
- public void setAccessableName(String name)
- {
- _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
- + name + ") ignored remains :" + getAccessableName());
- }
-
- public String getAccessableName()
- {
- return _name;
- }
-
-
- /**
- * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, "VirtualHost");
- }
-
- public String getObjectInstanceName()
- {
- return _name.toString();
- }
-
- public String getName()
- {
- return _name.toString();
- }
-
- public VirtualHost getVirtualHost()
- {
- return VirtualHost.this;
- }
-
-
- } // End of MBean class
-
- /**
- * Used for testing only
- * @param name
- * @param store
- *
- * @throws Exception
- */
- public VirtualHost(String name, MessageStore store) throws Exception
- {
- this(name, null, store);
- }
-
- /**
- * Normal Constructor
- *
- * @param name
- * @param hostConfig
- *
- * @throws Exception
- */
- public VirtualHost(String name, Configuration hostConfig) throws Exception
- {
- this(name, hostConfig, null);
- }
-
- private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
- {
- _name = name;
-
- _virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
-
- _queueRegistry = new DefaultQueueRegistry(this);
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- if (store != null)
- {
- _messageStore = store;
- }
- else
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
- initialiseTransactionManager(hostConfig);
- initialiseMessageStore(hostConfig);
- }
-
- _exchangeRegistry.initialise();
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
- _accessManager = new AccessManagerImpl(name, hostConfig);
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- }
-
- private void initialiseMessageStore(Configuration config) throws Exception
- {
- String messageStoreClass = config.getString("store.class");
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- //DTX MessageStore
-// _messageStore.configure(this, _transactionManager, "store", config);
- _messageStore.configure(this, "store", config);
- }
-
- private void initialiseTransactionManager(Configuration config) throws Exception
- {
- String transactionManagerClass = config.getString("txn.class");
- Class clazz = Class.forName(transactionManagerClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof TransactionManager))
- {
- throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
- " does not.");
- }
- _transactionManager = (TransactionManager) o;
- }
-
-
- public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
- {
- T instance;
- try
- {
- instance = instanceType.newInstance();
- }
- catch (Exception e)
- {
- _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
- }
- Configurator.configure(instance);
-
- return instance;
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public TransactionManager getTransactionManager()
- {
- return _transactionManager;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public AccessManager getAccessManager()
- {
- return _accessManager;
- }
-
- public void close() throws Exception
- {
- if (_messageStore != null)
- {
- _messageStore.close();
- }
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-}
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.AMQException;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class VirtualHost implements Accessable
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ private TransactionManager _transactionManager;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+ private AuthenticationManager _authenticationManager;
+
+ private AccessManager _accessManager;
+
+ private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
+
+ private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+
+ public void setAccessableName(String name)
+ {
+ _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+ + name + ") ignored remains :" + getAccessableName());
+ }
+
+ public String getAccessableName()
+ {
+ return _name;
+ }
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+ /**
+ * Used for testing only
+ * @param name
+ * @param store
+ * @throws Exception
+ */
+ public VirtualHost(String name, MessageStore store) throws Exception
+ {
+ this(name, null, store);
+ }
+
+ /**
+ * Normal Constructor
+ * @param name
+ * @param hostConfig
+ * @throws Exception
+ */
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ this(name, hostConfig, null);
+ }
+
+ public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ // This isn't needed to be registered
+ //_virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(hostConfig);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ if (store != null)
+ {
+ _messageStore = store;
+ }
+ else
+ {
+ if (hostConfig == null)
+ {
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ }
+ initialiseTransactionManager(hostConfig);
+ initialiseMessageStore(hostConfig);
+ }
+
+ _exchangeRegistry.initialise();
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+ _accessManager = new AccessManagerImpl(name, hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+ initialiseHouseKeeping(hostConfig);
+ }
+
+
+ private void initialiseHouseKeeping(final Configuration hostConfig)
+ {
+
+ long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+
+ /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+ if(period != 0L)
+ {
+ class RemoveExpiredMessagesTask extends TimerTask
+ {
+ public void run()
+ {
+ for(AMQQueue q : _queueRegistry.getQueues())
+ {
+
+ try
+ {
+ q.removeExpiredIfNoSubscribers();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
+ period/2,
+ period);
+ }
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+ private void initialiseTransactionManager(Configuration config) throws Exception
+ {
+ String transactionManagerClass = config.getString("txn.class");
+ Class clazz = Class.forName(transactionManagerClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof TransactionManager))
+ {
+ throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
+ " does not.");
+ }
+ _transactionManager = (TransactionManager) o;
+ }
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public TransactionManager getTransactionManager()
+ {
+ return _transactionManager;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public AccessManager getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public void close() throws Exception
+ {
+ if (_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Fri Feb 8 02:09:37 2008
@@ -24,6 +24,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -85,7 +86,7 @@
}
- protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
boolean showMessageHeaders)
{
@@ -96,8 +97,9 @@
display.add(hex);
display.add(ascii);
- for (AMQMessage msg : messages)
+ for (QueueEntry entry : messages)
{
+ AMQMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
@@ -252,8 +254,8 @@
private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
String title, boolean routing, boolean headers, boolean messageHeaders)
{
- List<AMQMessage> single = new LinkedList<AMQMessage>();
- single.add(msg);
+ List<QueueEntry> single = new LinkedList<QueueEntry>();
+ single.add(new QueueEntry(null,msg));
List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Fri Feb 8 02:09:37 2008
@@ -23,6 +23,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -166,12 +167,12 @@
if (fromQueue != null)
{
- List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue();
+ List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
if (messages != null)
{
- for (AMQMessage msg : messages)
+ for (QueueEntry msg : messages)
{
- ids.add(msg.getMessageId());
+ ids.add(msg.getMessage().getMessageId());
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Fri Feb 8 02:09:37 2008
@@ -27,6 +27,7 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -113,7 +114,7 @@
if (_queue != null)
{
- List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+ List<QueueEntry> messages = _queue.getMessagesOnTheQueue();
if (messages == null || messages.size() == 0)
{
_console.println("No messages on queue");
@@ -152,7 +153,7 @@
* @param showMessageHeaders show the msg headers be shown
* @return the formated data lists for printing
*/
- protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
boolean showMessageHeaders)
{
@@ -333,8 +334,9 @@
}
//Add create the table of data
- for (AMQMessage msg : messages)
+ for (QueueEntry entry : messages)
{
+ AMQMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java Fri Feb 8 02:09:37 2008
@@ -43,7 +43,7 @@
}
catch (Exception e)
{
- System.out.println("Unable to start broker due to: " + e.getMessage());
+ System.err.println("Unable to start broker due to: " + e.getMessage());
e.printStackTrace();
exit(1);
@@ -55,7 +55,7 @@
try
{
Process task = Runtime.getRuntime().exec(args[0]);
- System.out.println("Started Proccess: " + args[0]);
+ System.err.println("Started Proccess: " + args[0]);
InputStream inputStream = task.getInputStream();
@@ -70,19 +70,21 @@
out.join();
err.join();
- System.out.println("Waiting for process to exit: " + args[0]);
+ System.err.println("Waiting for process to exit: " + args[0]);
task.waitFor();
- System.out.println("Done Proccess: " + args[0]);
+ System.err.println("Done Proccess: " + args[0]);
}
catch (IOException e)
{
- System.out.println("Proccess had problems: " + e.getMessage());
+ System.err.println("Proccess had problems: " + e.getMessage());
+ e.printStackTrace(System.err);
exit(1);
}
catch (InterruptedException e)
{
- System.out.println("Proccess had problems: " + e.getMessage());
+ System.err.println("Proccess had problems: " + e.getMessage());
+ e.printStackTrace(System.err);
exit(1);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Fri Feb 8 02:09:37 2008
@@ -100,7 +100,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -139,7 +139,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -158,7 +158,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -198,7 +198,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -217,7 +217,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -236,7 +236,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -255,7 +255,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -294,7 +294,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -313,7 +313,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -353,7 +353,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -387,7 +387,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -427,7 +427,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -467,7 +467,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Fri Feb 8 02:09:37 2008
@@ -59,7 +59,7 @@
mbean.createNewBinding(_queue.getName().toString(), "binding2");
TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+ ArrayList<Object> list = new ArrayList<Object>(data.values());
assertTrue(list.size() == 2);
// test general exchange properties
@@ -85,7 +85,7 @@
mbean.createNewBinding(_queue.getName().toString(), "binding2");
TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+ ArrayList<Object> list = new ArrayList<Object>(data.values());
assertTrue(list.size() == 2);
// test general exchange properties
@@ -111,7 +111,7 @@
mbean.createNewBinding(_queue.getName().toString(), "key3=binding3");
TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+ ArrayList<Object> list = new ArrayList<Object>(data.values());
assertTrue(list.size() == 2);
// test general exchange properties
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Fri Feb 8 02:09:37 2008
@@ -291,7 +291,7 @@
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Feb 8 02:09:37 2008
@@ -23,11 +23,14 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -38,6 +41,8 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
@@ -59,11 +64,12 @@
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
private VirtualHost _virtualHost;
+ private AMQProtocolSession _protocolSession;
public void testMessageCount() throws Exception
{
int messageCount = 10;
- sendMessages(messageCount);
+ sendMessages(messageCount, false);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -76,6 +82,43 @@
_queueMBean.clearQueue();
assertTrue(_queueMBean.getMessageCount() == 0);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ public void testMessageCountPersistent() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, true);
+ assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ _queueMBean.deleteMessageFromTop();
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getMessageCount() == 0);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+ private void verifyBrokerState()
+ {
+
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+ // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+ assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
+ assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+ assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
}
public void testConsumerCount() throws AMQException
@@ -168,13 +211,13 @@
}
- AMQMessage msg = message(false);
+ AMQMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
msg.enqueue(_queue);
msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queue.process(_storeContext, msg, false);
+ _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
{
@@ -187,7 +230,7 @@
}
}
- private AMQMessage message(final boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -215,6 +258,7 @@
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
}
@@ -224,22 +268,41 @@
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
+ _protocolSession = new TestMinaProtocolSession();
}
- private void sendMessages(int messageCount) throws AMQException
+ private void sendMessages(int messageCount, boolean persistent) throws AMQException
{
AMQMessage[] messages = new AMQMessage[messageCount];
for (int i = 0; i < messages.length; i++)
{
- messages[i] = message(false);
+ messages[i] = message(false, persistent);
messages[i].enqueue(_queue);
messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
}
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ /* TGM: From here to end of method was:
+ * queue.process(_storeContext, messages[i], false);
+ */
+
+ AMQMessage currentMessage = message(false, persistent);
+ currentMessage.enqueue(_queue);
+
+ // route header
+ currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+ // Add the body so we have somthing to test later
+ currentMessage.addContentBodyFrame(_storeContext,
+ _protocolSession.getMethodRegistry()
+ .getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+ MESSAGE_SIZE)));
+
}
}
}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+
+ MemoryMessageStore _mms = null;
+
+ public TestableMemoryMessageStore(MemoryMessageStore mms)
+ {
+ _mms = mms;
+ }
+
+ public TestableMemoryMessageStore()
+ {
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ }
+
+ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ {
+ if (_mms != null)
+ {
+ return _mms._metaDataMap;
+ }
+ else
+ {
+ return _metaDataMap;
+ }
+ }
+
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ {
+ if (_mms != null)
+ {
+ return _mms._contentBodyMap;
+ }
+ else
+ {
+ return _contentBodyMap;
+ }
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml Fri Feb 8 02:09:37 2008
@@ -69,34 +69,12 @@
<artifactId>commons-lang</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.mina</groupId>
- <artifactId>mina-filter-ssl</artifactId>
- </dependency>
<!-- Test Dependencies -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.4.0</version>
- <scope>test</scope>
- </dependency>
<dependency> <!-- for inVm Broker -->
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Feb 8 02:09:37 2008
@@ -20,17 +20,29 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -49,25 +61,15 @@
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.QpidURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -79,7 +81,9 @@
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
* held by any child objects of this connection such as the session, producers and consumers.
*/
- protected final Object _failoverMutex = new Object();
+ private final Object _failoverMutex = new Object();
+
+ private final Object _sessionCreationLock = new Object();
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
@@ -88,7 +92,7 @@
protected long _maximumChannelCount;
/** The maximum size of frame supported by the server */
- protected long _maximumFrameSize;
+ private long _maximumFrameSize;
/**
* The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
@@ -98,24 +102,25 @@
protected AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- protected final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+ private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
- protected String _clientName;
+ private String _clientName;
/** The user name to use for authentication */
- protected String _username;
+ private String _username;
/** The password to use for authentication */
- protected String _password;
+ private String _password;
/** The virtual path to connect to on the AMQ server */
- protected String _virtualHost;
+ private String _virtualHost;
+
- protected ExceptionListener _exceptionListener;
+ private ExceptionListener _exceptionListener;
- protected ConnectionListener _connectionListener;
+ private ConnectionListener _connectionListener;
- protected ConnectionURL _connectionURL;
+ private ConnectionURL _connectionURL;
/**
* Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
@@ -139,22 +144,23 @@
/*
* The connection meta data
*/
- protected QpidConnectionMetaData _connectionMetaData;
+ private QpidConnectionMetaData _connectionMetaData;
/** Configuration info for SSL */
- protected SSLConfiguration _sslConfiguration;
+ private SSLConfiguration _sslConfiguration;
- protected AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- protected AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- protected AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- protected AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
- protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
- protected static final long DEFAULT_TIMEOUT = 1000 * 30;
+ private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+ private static final long DEFAULT_TIMEOUT = 1000 * 30;
+ private ProtocolVersion _protocolVersion;
protected AMQConnectionDelegate _delegate;
-
+
/**
* @param broker brokerdetails
* @param username username
@@ -276,6 +282,9 @@
_clientName = connectionURL.getClientName();
_username = connectionURL.getUsername();
_password = connectionURL.getPassword();
+
+ _protocolVersion = connectionURL.getProtocolVersion();
+
setVirtualHost(connectionURL.getVirtualHost());
if (connectionURL.getDefaultQueueExchangeName() != null)
@@ -298,9 +307,9 @@
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
-
_failoverPolicy = new FailoverPolicy(connectionURL);
- _protocolHandler = new AMQProtocolHandler(this);
+
+ _protocolHandler = new AMQProtocolHandler(this);
// We are not currently connected
_connected = false;
@@ -320,6 +329,11 @@
{
lastException = e;
+ //We need to change protocol handler here as an error during the connect will not
+ // cause the StateManager to be replaced. So the state is out of sync on reconnect
+ // This can be seen when a exception occurs during connection. i.e. log4j NoSuchMethod. (using < 1.2.12)
+ _protocolHandler.setStateManager(new AMQStateManager());
+
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
@@ -374,7 +388,7 @@
if (e.getCause() != null)
{
e.initCause(lastException);
- }
+ }
}
throw e;
@@ -383,6 +397,18 @@
_connectionMetaData = new QpidConnectionMetaData(this);
}
+ protected boolean checkException(Throwable thrown)
+ {
+ Throwable cause = thrown.getCause();
+
+ if (cause == null)
+ {
+ cause = thrown;
+ }
+
+ return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+ }
+
protected AMQConnection(String username, String password, String clientName, String virtualHost)
{
_clientName = clientName;
@@ -401,20 +427,6 @@
_virtualHost = virtualHost;
}
- protected boolean checkException(Throwable thrown)
- {
- Throwable cause = thrown.getCause();
-
- if (cause == null)
- {
- cause = thrown;
- }
-
- return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
- }
-
-
-
public boolean attemptReconnection(String host, int port)
{
BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
@@ -520,6 +532,50 @@
return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
}
+ private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
+ {
+
+ ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+
+ _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+
+ if (transacted)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Issuing TxSelect for " + channelId);
+ }
+
+ TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
+ }
+ }
+
+ private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
+ {
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ }
+ catch (AMQException e)
+ {
+ deregisterSession(channelId);
+ throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
+ }
+ }
+
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -615,7 +671,6 @@
checkNotClosed();
if (!_started)
{
- _started = true;
final Iterator it = _sessions.entrySet().iterator();
while (it.hasNext())
{
@@ -630,6 +685,7 @@
}
}
+ _started = true;
}
}
@@ -661,49 +717,87 @@
public void close(long timeout) throws JMSException
{
- synchronized (getFailoverMutex())
+ close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+ }
+
+ public void close(List<AMQSession> sessions, long timeout) throws JMSException
+ {
+ synchronized(_sessionCreationLock)
{
- if (!_closed.getAndSet(true))
+ if(!sessions.isEmpty())
{
- try
+ AMQSession session = sessions.remove(0);
+ synchronized(session.getMessageDeliveryLock())
{
- long startCloseTime = System.currentTimeMillis();
-
- _taskPool.shutdown();
- closeAllSessions(null, timeout, startCloseTime);
-
- if (!_taskPool.isTerminated())
+ close(sessions, timeout);
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ if (!_closed.getAndSet(true))
{
try
{
+ long startCloseTime = System.currentTimeMillis();
+
+ _taskPool.shutdown();
+ closeAllSessions(null, timeout, startCloseTime);
+
+ if (!_taskPool.isTerminated())
+ {
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
+ }
+
// adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+ timeout = adjustTimeout(timeout, startCloseTime);
+ _delegate.closeConneciton(timeout);
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ //If the taskpool hasn't shutdown by now then give it shutdownNow.
+ // This will interupt any running tasks.
+ if (!_taskPool.isTerminated())
+ {
+ List<Runnable> tasks = _taskPool.shutdownNow();
+ for (Runnable r : tasks)
+ {
+ _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+ }
+ }
}
- catch (InterruptedException e)
+ catch (AMQException e)
{
- _logger.info("Interrupted while shutting down connection thread pool.");
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
-
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConneciton(timeout);
- //_protocolHandler.closeConnection(timeout);
-
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- throw jmse;
}
}
}
}
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+ return timeout;
+ }
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
@@ -768,19 +862,6 @@
}
}
-
- private long adjustTimeout(long timeout, long startTime)
- {
- long now = System.currentTimeMillis();
- timeout -= now - startTime;
- if (timeout < 0)
- {
- timeout = 0;
- }
-
- return timeout;
- }
-
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
@@ -974,18 +1055,7 @@
/**
* Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
- * to a JMS exception liste
- {
- ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
- for (Iterator it = sessions.iterator(); it.hasNext();)
- {
- AMQSession s = (AMQSession) it.next();
- // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
- s.resubscribe();
- }
- }ner, if configured, and propagates the exception to sessions, which in turn will
+ * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
* propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
@@ -1042,6 +1112,10 @@
{
_exceptionListener.onException(je);
}
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause.getMessage());
+ }
if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException))
{
@@ -1162,5 +1236,21 @@
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public void setProtocolVersion(ProtocolVersion protocolVersion)
+ {
+ _protocolVersion = protocolVersion;
+ _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+ }
+
+ public boolean isFailingOver()
+ {
+ return (_protocolHandler.getFailoverLatch() != null);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Fri Feb 8 02:09:37 2008
@@ -178,31 +178,25 @@
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
-
+ ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
// TODO: Be aware of possible changes to parameter order as versions change.
-
- _conn._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
- _conn._protocolHandler.getProtocolMinorVersion(), null), // outOfBand
- ChannelOpenOkBody.class);
+ _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
- _conn._protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
- _conn._protocolHandler.getProtocolMinorVersion(), false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
-
+ BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+ _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+
if (transacted)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Issuing TxSelect for " + channelId);
}
-
+ TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
+
// TODO: Be aware of possible changes to parameter order as versions change.
- _conn._protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
- _conn._protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
+ _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
}
@@ -212,7 +206,7 @@
*/
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
- ArrayList sessions = new ArrayList(_conn._sessions.values());
+ ArrayList sessions = new ArrayList(_conn.getSessions().values());
_logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Fri Feb 8 02:09:37 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Hashtable;
+import java.util.UUID;
import javax.jms.*;
import javax.naming.Context;
@@ -258,7 +259,7 @@
}
catch (UnknownHostException e)
{
- return null;
+ return "UnknownHost" + UUID.randomUUID();
}
}
@@ -351,7 +352,9 @@
* @param name
* @param ctx
* @param env
+ *
* @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
+ *
* @throws Exception
*/
public Object getObjectInstance(Object obj, Name name, Context ctx, Hashtable env) throws Exception
@@ -407,8 +410,9 @@
public Reference getReference() throws NamingException
{
- return new Reference(AMQConnectionFactory.class.getName(),
- new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+ return new Reference(
+ AMQConnectionFactory.class.getName(),
+ new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
AMQConnectionFactory.class.getName(), null); // factory location
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Fri Feb 8 02:09:37 2008
@@ -29,6 +29,7 @@
import org.apache.qpid.client.url.URLParser_0_8;
import org.apache.qpid.client.url.URLParser_0_10;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.URLHelper;
@@ -53,6 +54,7 @@
private AMQShortString _defaultTopicExchangeName;
private AMQShortString _temporaryTopicExchangeName;
private AMQShortString _temporaryQueueExchangeName;
+ private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion();
private byte _urlVersion;
public AMQConnectionURL(String fullURL) throws URLSyntaxException
@@ -104,6 +106,15 @@
public void setURLVersion(byte version)
{
_urlVersion = version;
+ if(_options.containsKey(OPTIONS_PROTOCOL_VERSION))
+ {
+ ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION));
+ if(pv != null)
+ {
+ _protocolVersion = pv;
+ }
+ }
+
}
public String getURL()
@@ -264,6 +275,11 @@
public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
{
_temporaryTopicExchangeName = temporaryTopicExchangeName;
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
}
public String toString()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Fri Feb 8 02:09:37 2008
@@ -49,6 +49,6 @@
//Not sure what the best approach is here, probably to treat this like a topic
//and allow server to generate names. As it is AMQ specific it doesn't need to
//fit the JMS API expectations so this is not as yet critical.
- return false;
+ return getAMQQueueName() == null;
}
}