You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/26 22:04:39 UTC
svn commit: r490372 [1/3] - in
/incubator/qpid/branches/new_persistence/java: ./ broker/ broker/etc/
broker/src/main/grammar/ broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/filter/
broker/src/main/java/org/apac...
Author: rgreig
Date: Tue Dec 26 13:04:28 2006
New Revision: 490372
URL: http://svn.apache.org/viewvc?view=rev&rev=490372
Log:
Merge of trunk up to rev 489403
Added:
incubator/qpid/branches/new_persistence/java/broker/etc/qpid-server.conf.jpp
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/etc/qpid-server.conf.jpp
incubator/qpid/branches/new_persistence/java/broker/src/main/grammar/
- copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/grammar/
incubator/qpid/branches/new_persistence/java/broker/src/main/grammar/SelectorParser.jj
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/
- copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/
- copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/
- copied from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
incubator/qpid/branches/new_persistence/java/common/src/main/java/log4j.properties
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/
- copied from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
incubator/qpid/branches/new_persistence/java/distribution/
incubator/qpid/branches/new_persistence/java/distribution/pom.xml (with props)
incubator/qpid/branches/new_persistence/java/distribution/src/
incubator/qpid/branches/new_persistence/java/distribution/src/main/
incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/
incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/bin.xml (with props)
incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/client-bin.xml (with props)
incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/management-eclipse-plugin-unix.xml (with props)
incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/management-eclipse-plugin.xml (with props)
incubator/qpid/branches/new_persistence/java/distribution/src/main/assembly/src.xml (with props)
incubator/qpid/branches/new_persistence/java/distribution/src/main/release/
incubator/qpid/branches/new_persistence/java/distribution/src/main/release/DISCLAIMER
incubator/qpid/branches/new_persistence/java/distribution/src/main/release/docs/
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
- copied unchanged from r489403, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
- copied, changed from r489403, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
Modified:
incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
incubator/qpid/branches/new_persistence/java/broker/pom.xml
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
incubator/qpid/branches/new_persistence/java/client/pom.xml
incubator/qpid/branches/new_persistence/java/client/src/log4j.properties
incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
incubator/qpid/branches/new_persistence/java/pom.xml
incubator/qpid/branches/new_persistence/java/systests/pom.xml
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
Modified: incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/config.xml?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/config.xml Tue Dec 26 13:04:28 2006
@@ -82,8 +82,8 @@
<auto_register>true</auto_register>
</queue>
<store>
- <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>-->
</store>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
Modified: incubator/qpid/branches/new_persistence/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/pom.xml?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/broker/pom.xml Tue Dec 26 13:04:28 2006
@@ -34,7 +34,6 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -55,6 +54,10 @@
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
</dependency>
@@ -82,6 +85,29 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.0</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated-sources</outputDirectory>
+ <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Dec 26 13:04:28 2006
@@ -25,6 +25,8 @@
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -41,6 +43,8 @@
import org.apache.qpid.server.txn.TxnBuffer;
import java.util.*;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -102,6 +106,8 @@
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+ private Set<Long> _browsedAcks = new HashSet<Long>();
+
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -111,7 +117,7 @@
_messageStore = messageStore;
_exchanges = exchanges;
// by default the session is non-transactional
- _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages);
+ _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages, _browsedAcks);
}
/**
@@ -311,13 +317,14 @@
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
+ * @param noLocal
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks)
- throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -328,7 +335,7 @@
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -524,6 +531,12 @@
return _unacknowledgedMessageMap;
}
+ public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue)
+ {
+ _browsedAcks.add(deliveryTag);
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
private void checkSuspension()
{
boolean suspend;
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -58,7 +59,8 @@
throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
}
- public String getExpressionSymbol() {
+ public String getExpressionSymbol()
+ {
return "+";
}
};
@@ -193,7 +195,8 @@
}
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Object lvalue = left.evaluate(message);
if (lvalue == null) {
return null;
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Tue Dec 26 13:04:28 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -33,13 +34,14 @@
*
* @version $Revision$
*/
-public interface BooleanExpression extends Expression {
-
+public interface BooleanExpression extends Expression
+{
+
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
* @throws JMSException
*/
- public boolean matches(AMQMessage message) throws JMSException;
+ public boolean matches(AMQMessage message) throws AMQException;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import java.util.HashSet;
import java.util.List;
@@ -123,7 +124,7 @@
/**
* org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
*/
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rv = this.getRight().evaluate(message);
@@ -139,7 +140,7 @@
return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
}
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
@@ -199,7 +200,7 @@
private static BooleanExpression doCreateEqual(Expression left, Expression right) {
return new ComparisonExpression(left, right) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object lv = left.evaluate(message);
Object rv = right.evaluate(message);
@@ -340,7 +341,8 @@
super(left, right);
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Comparable lv = (Comparable) left.evaluate(message);
if (lv == null) {
return null;
@@ -457,7 +459,7 @@
protected abstract boolean asBoolean(int answer);
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import java.math.BigDecimal;
@@ -29,19 +30,24 @@
/**
* Represents a constant expression
- *
+ *
* @version $Revision$
*/
-public class ConstantExpression implements Expression {
+public class ConstantExpression implements Expression
+{
- static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
- public BooleanConstantExpression(Object value) {
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+ {
+ public BooleanConstantExpression(Object value)
+ {
super(value);
}
- public boolean matches(AMQMessage message) throws JMSException {
+
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
- return object!=null && object==Boolean.TRUE;
- }
+ return object != null && object == Boolean.TRUE;
+ }
}
public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
@@ -50,73 +56,92 @@
private Object value;
- public static ConstantExpression createFromDecimal(String text) {
-
- // Strip off the 'l' or 'L' if needed.
- if( text.endsWith("l") || text.endsWith("L") )
- text = text.substring(0, text.length()-1);
-
- Number value;
- try {
- value = new Long(text);
- } catch ( NumberFormatException e) {
- // The number may be too big to fit in a long.
- value = new BigDecimal(text);
- }
-
+ public static ConstantExpression createFromDecimal(String text)
+ {
+
+ // Strip off the 'l' or 'L' if needed.
+ if (text.endsWith("l") || text.endsWith("L"))
+ {
+ text = text.substring(0, text.length() - 1);
+ }
+
+ Number value;
+ try
+ {
+ value = new Long(text);
+ }
+ catch (NumberFormatException e)
+ {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
long l = value.longValue();
- if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
value = new Integer(value.intValue());
}
return new ConstantExpression(value);
}
- public static ConstantExpression createFromHex(String text) {
+ public static ConstantExpression createFromHex(String text)
+ {
Number value = new Long(Long.parseLong(text.substring(2), 16));
long l = value.longValue();
- if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
value = new Integer(value.intValue());
}
return new ConstantExpression(value);
}
- public static ConstantExpression createFromOctal(String text) {
+ public static ConstantExpression createFromOctal(String text)
+ {
Number value = new Long(Long.parseLong(text, 8));
long l = value.longValue();
- if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE)
+ {
value = new Integer(value.intValue());
}
return new ConstantExpression(value);
}
- public static ConstantExpression createFloat(String text) {
+ public static ConstantExpression createFloat(String text)
+ {
Number value = new Double(text);
return new ConstantExpression(value);
}
- public ConstantExpression(Object value) {
+ public ConstantExpression(Object value)
+ {
this.value = value;
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
return value;
}
- public Object getValue() {
+ public Object getValue()
+ {
return value;
- }
+ }
/**
* @see java.lang.Object#toString()
*/
- public String toString() {
- if (value == null) {
+ public String toString()
+ {
+ if (value == null)
+ {
return "NULL";
}
- if (value instanceof Boolean) {
+ if (value instanceof Boolean)
+ {
return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
}
- if (value instanceof String) {
+ if (value instanceof String)
+ {
return encodeString((String) value);
}
return value.toString();
@@ -127,7 +152,8 @@
*
* @see java.lang.Object#hashCode()
*/
- public int hashCode() {
+ public int hashCode()
+ {
return toString().hashCode();
}
@@ -136,9 +162,11 @@
*
* @see java.lang.Object#equals(java.lang.Object)
*/
- public boolean equals(Object o) {
+ public boolean equals(Object o)
+ {
- if (o == null || !this.getClass().equals(o.getClass())) {
+ if (o == null || !this.getClass().equals(o.getClass()))
+ {
return false;
}
return toString().equals(o.toString());
@@ -153,12 +181,15 @@
* @param s
* @return
*/
- public static String encodeString(String s) {
+ public static String encodeString(String s)
+ {
StringBuffer b = new StringBuffer();
b.append('\'');
- for (int i = 0; i < s.length(); i++) {
+ for (int i = 0; i < s.length(); i++)
+ {
char c = s.charAt(i);
- if (c == '\'') {
+ if (c == '\'')
+ {
b.append(c);
}
b.append(c);
@@ -166,5 +197,5 @@
b.append('\'');
return b.toString();
}
-
+
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Tue Dec 26 13:04:28 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -32,11 +33,12 @@
*
* @version $Revision$
*/
-public interface Expression {
+public interface Expression
+{
/**
* @return the value of this expression
*/
- public Object evaluate(AMQMessage message) throws JMSException;
+ public Object evaluate(AMQMessage message) throws AMQException;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Dec 26 13:04:28 2006
@@ -64,7 +64,7 @@
_logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
return match;
}
- catch (JMSException e)
+ catch (AMQException e)
{
//fixme this needs to be sorted.. it shouldn't happen
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -35,7 +36,7 @@
public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
return new LogicExpression(lvalue, rvalue) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lv = (Boolean) left.evaluate(message);
// Can we do an OR shortcut??
@@ -56,7 +57,7 @@
public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
return new LogicExpression(lvalue, rvalue) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lv = (Boolean) left.evaluate(message);
@@ -85,9 +86,9 @@
super(left, right);
}
- abstract public Object evaluate(AMQMessage message) throws JMSException;
+ abstract public Object evaluate(AMQMessage message) throws AMQException;
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Tue Dec 26 13:04:28 2006
@@ -21,9 +21,7 @@
package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.filter.jms.selector.SelectorParser;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.log4j.Logger;
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Tue Dec 26 13:04:28 2006
@@ -47,7 +47,7 @@
interface SubExpression
{
- public Object evaluate(AMQMessage message);
+ public Object evaluate(AMQMessage message) throws AMQException;
}
interface JMSExpression
@@ -65,7 +65,7 @@
}
- public Object evaluate(AMQMessage message)
+ public Object evaluate(AMQMessage message) throws AMQException
{
JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
if (msg != null)
@@ -226,7 +226,7 @@
jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
}
- public Object evaluate(AMQMessage message) throws JMSException
+ public Object evaluate(AMQMessage message) throws AMQException
{
// try
// {
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
import java.math.BigDecimal;
import java.util.Collection;
@@ -43,7 +44,8 @@
public static Expression createNegate(Expression left) {
return new UnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
return null;
@@ -74,7 +76,7 @@
final Collection inList = t;
return new BooleanUnaryExpression(right) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
@@ -126,7 +128,7 @@
super(left);
}
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
@@ -135,7 +137,7 @@
public static BooleanExpression createNOT(BooleanExpression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lvalue = (Boolean) right.evaluate(message);
if (lvalue == null) {
return null;
@@ -159,7 +161,7 @@
public static BooleanExpression createBooleanCast(Expression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rvalue = right.evaluate(message);
if (rvalue == null)
return null;
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Tue Dec 26 13:04:28 2006
@@ -31,6 +31,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
/**
* Used to evaluate an XPath Expression in a JMS selector.
@@ -75,7 +76,7 @@
private final XPathEvaluator evaluator;
static public interface XPathEvaluator {
- public boolean evaluate(AMQMessage message) throws JMSException;
+ public boolean evaluate(AMQMessage message) throws AMQException;
}
XPathExpression(String xpath) {
@@ -97,7 +98,7 @@
}
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
// try {
//FIXME this is flow to disk work
// if( message.isDropped() )
@@ -122,7 +123,8 @@
* @return true if the expression evaluates to Boolean.TRUE.
* @throws JMSException
*/
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Tue Dec 26 13:04:28 2006
@@ -18,6 +18,7 @@
package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
//
@@ -35,7 +36,7 @@
this.xpath = xpath;
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
return Boolean.FALSE;
}
@@ -48,7 +49,8 @@
* @return true if the expression evaluates to Boolean.TRUE.
* @throws JMSException
*/
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Tue Dec 26 13:04:28 2006
@@ -34,6 +34,7 @@
//import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.xpath.CachedXPathAPI;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
import org.w3c.dom.Document;
import org.w3c.dom.traversal.NodeIterator;
import org.xml.sax.InputSource;
@@ -46,17 +47,26 @@
this.xpath = xpath;
}
- public boolean evaluate(AMQMessage m) throws JMSException {
- if( m instanceof TextMessage ) {
- String text = ((TextMessage)m).getText();
- return evaluate(text);
- } else if ( m instanceof BytesMessage ) {
- BytesMessage bm = (BytesMessage) m;
- byte data[] = new byte[(int) bm.getBodyLength()];
- bm.readBytes(data);
- return evaluate(data);
- }
- return false;
+ public boolean evaluate(AMQMessage m) throws AMQException
+ {
+ try
+ {
+
+ if( m instanceof TextMessage ) {
+ String text = ((TextMessage)m).getText();
+ return evaluate(text);
+ } else if ( m instanceof BytesMessage ) {
+ BytesMessage bm = (BytesMessage) m;
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return evaluate(data);
+ }
+ return false;
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException("Error evaluting message: " + e, e);
+ }
}
private boolean evaluate(byte[] data) {
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -21,10 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -74,8 +77,9 @@
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck);
- if(!body.nowait)
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+ body.arguments, body.noLocal);
+ if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
}
@@ -83,10 +87,19 @@
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
+ }
catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+ BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -78,12 +78,19 @@
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ //save clientProperties
+ if (protocolSession.getClientProperties() == null)
+ {
+ protocolSession.setClientProperties(body.clientProperties);
+ }
+
switch (authResult.status)
{
case ERROR:
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
+
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@
static int getConfiguredFrameSize()
{
final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
- final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
_logger.info("Framesize set to " + framesize);
return framesize;
}
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java Tue Dec 26 13:04:28 2006
@@ -24,6 +24,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
import javax.jms.Message;
import javax.jms.JMSException;
@@ -37,7 +38,7 @@
private AMQMessage _message;
private BasicContentHeaderProperties _properties;
- public JMSMessage(AMQMessage message)
+ public JMSMessage(AMQMessage message) throws AMQException
{
_message = message;
ContentHeaderBody contentHeader = message.getContentHeaderBody();
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Dec 26 13:04:28 2006
@@ -26,9 +26,19 @@
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
-import org.apache.qpid.framing.*;
+
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.Managable;
@@ -84,6 +94,7 @@
/* AMQP Version for this session */
private byte _major;
private byte _minor;
+ private FieldTable _clientProperties;
public ManagedObject getManagedObject()
{
@@ -119,7 +130,7 @@
{
return new AMQProtocolSessionMBean(this);
}
- catch(JMException ex)
+ catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
@@ -144,7 +155,7 @@
{
ProtocolInitiation pi = (ProtocolInitiation) message;
// this ensures the codec never checks for a PI message again
- ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
pi.checkVersion(this); // Fails if not correct
@@ -153,7 +164,7 @@
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null,
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
mechanisms.getBytes(), locales.getBytes());
_minaProtocolSession.write(response);
}
@@ -195,7 +206,7 @@
_logger.debug("Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
- (AMQMethodBody)frame.bodyFrame);
+ (AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
@@ -250,7 +261,7 @@
{
_logger.debug("Content header frame received: " + frame);
}
- getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -294,8 +305,13 @@
return _channelMap.get(channelId);
}
- public void addChannel(AMQChannel channel)
+ public void addChannel(AMQChannel channel) throws AMQException
{
+ if (_closed)
+ {
+ throw new AMQException("Session is closed");
+ }
+
_channelMap.put(channel.getChannelId(), channel);
checkForNotification();
}
@@ -339,6 +355,7 @@
* Close a specific channel. This will remove any resources used by the channel, including:
* <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
* </ul>
+ *
* @param channelId id of the channel to close
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
@@ -365,6 +382,7 @@
/**
* In our current implementation this is used by the clustering code.
+ *
* @param channelId
*/
public void removeChannel(int channelId)
@@ -374,11 +392,12 @@
/**
* Initialise heartbeats on the session.
+ *
* @param delay delay in seconds (not ms)
*/
public void initHeartbeats(int delay)
{
- if(delay > 0)
+ if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
_minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
@@ -388,6 +407,7 @@
/**
* Closes all channels that were opened by this protocol session. This frees up all resources
* used by the channel.
+ *
* @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels() throws AMQException
@@ -396,6 +416,7 @@
{
channel.close(this);
}
+ _channelMap.clear();
}
/**
@@ -404,7 +425,7 @@
*/
public void closeSession() throws AMQException
{
- if(!_closed)
+ if (!_closed)
{
_closed = true;
closeAllChannels();
@@ -446,11 +467,11 @@
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
- return ((InetSocketAddress)address).getHostName();
+ return ((InetSocketAddress) address).getHostName();
}
else if (address instanceof VmPipeAddress)
{
- return "vmpipe:" + ((VmPipeAddress)address).getPort();
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
}
else
{
@@ -466,6 +487,16 @@
public void setSaslServer(SaslServer saslServer)
{
_saslServer = saslServer;
+ }
+
+ public FieldTable getClientProperties()
+ {
+ return _clientProperties;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ _clientProperties = clientProperties;
}
/**
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Dec 26 13:04:28 2006
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
@@ -69,7 +70,7 @@
* @param channel the channel to associate with this session. It is an error to
* associate the same channel with more than one session but this is not validated.
*/
- void addChannel(AMQChannel channel);
+ void addChannel(AMQChannel channel) throws AMQException;
/**
* Close a specific channel. This will remove any resources used by the channel, including:
@@ -122,4 +123,9 @@
* @param saslServer
*/
void setSaslServer(SaslServer saslServer);
+
+
+ FieldTable getClientProperties();
+
+ void setClientProperties(FieldTable clientProperties);
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Dec 26 13:04:28 2006
@@ -18,6 +18,9 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -183,33 +186,17 @@
}
/**
- * @see AMQMinaProtocolSession#closeChannel(int)
- */
- public void closeChannel(int id) throws JMException
- {
- try
- {
- AMQChannel channel = _session.getChannel(id);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + id + ") does not exist");
- }
-
- _session.closeChannel(id);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
* closes the connection. The administrator can use this management operation to close connection to free up
* resources.
* @throws JMException
*/
public void closeConnection() throws JMException
{
+
+ final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
+ "Broker Management Console has closing the connection.", 0, 0);
+ _session.writeFrame(response);
+
try
{
_session.closeSession();
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Tue Dec 26 13:04:28 2006
@@ -114,15 +114,6 @@
void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
/**
- * Unsubscribes the consumers and unregisters the channel from managed objects.
- */
- @MBeanOperation(name="closeChannel",
- description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
- impact= MBeanOperationInfo.ACTION)
- void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
- throws Exception;
-
- /**
* Closes all the related channels and unregisters this connection from managed objects.
*/
@MBeanOperation(name="closeConnection",
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Dec 26 13:04:28 2006
@@ -26,10 +26,14 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.log4j.Logger;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Combines the information that make up a deliverable message into a more manageable form.
@@ -38,6 +42,8 @@
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
+ public static final String JMS_MESSAGE = "jms.message";
+
/**
* Used in clustering
*/
@@ -64,6 +70,9 @@
*/
private boolean _deliveredToConsumer;
+ private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+ private AtomicBoolean _taken;
+
private TransientMessageData _transientMessageData = new TransientMessageData();
/**
@@ -141,6 +150,8 @@
_messageId = messageId;
_txnContext = txnContext;
_transientMessageData.setPublishBody(publishBody);
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
+ _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
_log.debug("Message created with id " + messageId);
@@ -358,6 +369,60 @@
return _publisher;
}
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
+
+ public MessageDecorator getDecodedMessage(String type) throws AMQException
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type) throws AMQException
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
+ }
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
+ }
+
public boolean checkToken(Object token)
{
if (_tokens.contains(token))
@@ -507,8 +572,7 @@
}
//
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ // Now start writing out the other content bodies
//
while (bodyFrameIterator.hasNext())
{
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Dec 26 13:04:28 2006
@@ -22,7 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -96,7 +96,7 @@
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -188,16 +188,29 @@
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- //fixme - Pick one.
- if (Boolean.getBoolean("concurrentdeliverymanager"))
+ //fixme - Make this configurable via the broker config.xml
+ if (System.getProperties().getProperty("deliverymanager") != null)
{
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ }
+ else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentDeliveryManager");
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ }
+ else
+ {
+ _logger.info("Using SynchronizedDeliveryManager");
+ _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ }
}
else
{
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
}
@@ -349,12 +362,26 @@
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
+ }
+
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+
+ if(subscription.hasFilters())
+ {
+ if (_deliveryMgr.hasQueuedMessages())
+ {
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
+ }
+ }
+
_subscribers.addSubscriber(subscription);
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -195,6 +195,11 @@
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -309,7 +314,6 @@
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
finally
Copied: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (from r489403, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=490372&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java&r1=489403&p2=incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java&r2=490372
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -99,10 +99,10 @@
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
- Iterator it = msg.getContentBodies().iterator();
+ Iterator<ContentBody> it = msg.getContentBodyIterator();
while (it.hasNext())
{
- ContentBody cb = (ContentBody) it.next();
+ ContentBody cb = it.next();
cb.reduceBufferToFit();
}
}
@@ -220,7 +220,7 @@
//remove sent message from our queue.
messageQueue.poll();
}
- catch (FailedDequeueException e)
+ catch (AMQException e)
{
message.release();
_log.error("Unable to deliver message as dequeue failed: " + e, e);
@@ -276,7 +276,7 @@
return _messages.poll();
}
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ public void deliver(String name, AMQMessage msg) throws AMQException
{
_log.info(id() + "deliver :" + System.identityHashCode(msg));
@@ -289,7 +289,7 @@
if (s == null) //no-one can take the message right now.
{
_log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
- if (!msg.isImmediate())
+ if (!msg.getPublishBody().immediate)
{
addMessageToQueue(msg);
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -73,4 +73,6 @@
void clearAllMessages() throws AMQException;
List<AMQMessage> getMessages();
+
+ void populatePreDeliveryQueue(Subscription subscription);
}