You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 06:05:38 UTC
svn commit: r780558 [1/3] - in /activemq/sandbox/activemq-flow: ./
activemq-all/ activemq-all/.settings/ activemq-all/eclipse-classes/
activemq-all/target/ activemq-broker/
activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/m...
Author: chirino
Date: Mon Jun 1 04:05:34 2009
New Revision: 780558
URL: http://svn.apache.org/viewvc?rev=780558&view=rev
Log:
- created a selector package to hold the JMS selector logic
- moved all the queue and flow bits together
- prtobuf compiler can now work with test proto files
- lots of cleanup
Added:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerDatabase.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/BrokerDatabase.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/
- copied from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDescriptor.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoreListener.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoreListener.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoredElement.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoredElement.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/SaveableQueueElement.java
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/
- copied from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/
activemq/sandbox/activemq-flow/activemq-flow/src/test/proto/
- copied from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/proto/
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireMessageEvaluationContext.java
activemq/sandbox/activemq-flow/activemq-selector/ (with props)
activemq/sandbox/activemq-flow/activemq-selector/pom.xml
activemq/sandbox/activemq-flow/activemq-selector/src/
activemq/sandbox/activemq-flow/activemq-selector/src/main/
activemq/sandbox/activemq-flow/activemq-selector/src/main/grammar/
activemq/sandbox/activemq-flow/activemq-selector/src/main/grammar/SelectorParser.jj (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/BinaryExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/FilterException.java
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/java/org/apache/activemq/filter/package.html (with props)
activemq/sandbox/activemq-flow/activemq-selector/src/main/resources/
activemq/sandbox/activemq-flow/activemq-selector/src/test/
activemq/sandbox/activemq-flow/activemq-selector/src/test/java/
activemq/sandbox/activemq-flow/activemq-selector/src/test/resources/
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageEvaluationContext.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/AsyncTransport.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/AsyncTransport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MutexTransport.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/ResponseCallback.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
- copied unchanged from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/Pipe.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/Pipe.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
- copied, changed from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/
- copied from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LRUCache.java (with props)
Removed:
activemq/sandbox/activemq-flow/activemq-all/.classpath
activemq/sandbox/activemq-flow/activemq-all/.project
activemq/sandbox/activemq-flow/activemq-all/.settings/org.eclipse.jdt.core.prefs
activemq/sandbox/activemq-flow/activemq-all/eclipse-classes/
activemq/sandbox/activemq-flow/activemq-all/target/
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/BrokerDatabase.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoreListener.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoredElement.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/SaveableQueueElement.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/queue/
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-broker/src/main/proto/
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/flow/
activemq/sandbox/activemq-flow/activemq-broker/src/test/resources/META-INF/
activemq/sandbox/activemq-flow/activemq-client/.classpath
activemq/sandbox/activemq-flow/activemq-client/.project
activemq/sandbox/activemq-flow/activemq-client/.settings/org.eclipse.jdt.core.prefs
activemq/sandbox/activemq-flow/activemq-client/eclipse-classes/
activemq/sandbox/activemq-flow/activemq-client/target/
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
activemq/sandbox/activemq-flow/activemq-network/.classpath
activemq/sandbox/activemq-flow/activemq-network/.project
activemq/sandbox/activemq-flow/activemq-network/.settings/org.eclipse.jdt.core.prefs
activemq/sandbox/activemq-flow/activemq-network/eclipse-classes/
activemq/sandbox/activemq-flow/activemq-network/target/
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
Modified:
activemq/sandbox/activemq-flow/activemq-all/ (props changed)
activemq/sandbox/activemq-flow/activemq-broker/pom.xml
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageBroker.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-flow/activemq-client/ (props changed)
activemq/sandbox/activemq-flow/activemq-flow/pom.xml
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/activemq-network/ (props changed)
activemq/sandbox/activemq-flow/activemq-openwire/pom.xml
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/ProtoMojo.java
activemq/sandbox/activemq-flow/activemq-protobuf/pom.xml
activemq/sandbox/activemq-flow/activemq-stomp/pom.xml
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-flow/activemq-transport/pom.xml
activemq/sandbox/activemq-flow/kahadb/pom.xml
activemq/sandbox/activemq-flow/pom.xml
Propchange: activemq/sandbox/activemq-flow/activemq-all/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Jun 1 04:05:34 2009
@@ -0,0 +1,9 @@
+.project
+.classpath
+.settings
+.wtpmodules
+*.iml
+junit*.properties
+eclipse-classes
+target
+
Modified: activemq/sandbox/activemq-flow/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/pom.xml?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/pom.xml Mon Jun 1 04:05:34 2009
@@ -55,6 +55,10 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-selector</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
<artifactId>activemq-kaha</artifactId>
</dependency>
@@ -94,30 +98,6 @@
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.activemq.protobuf</groupId>
- <artifactId>activemq-protobuf</artifactId>
- <configuration>
- <type>alt</type>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.5</source>
- <target>1.5</target>
- </configuration>
- </plugin>
-
</plugins>
</build>
Copied: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerDatabase.java (from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/BrokerDatabase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerDatabase.java?p2=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerDatabase.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/BrokerDatabase.java&r1=780474&r2=780558&rev=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerDatabase.java Mon Jun 1 04:05:34 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.db;
+package org.apache.activemq.broker;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,11 +27,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.broker.BrokerMessageDelivery;
-import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.protocol.ProtocolHandler;
import org.apache.activemq.broker.protocol.ProtocolHandlerFactory;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.Callback;
import org.apache.activemq.broker.store.Store.FatalStoreException;
@@ -49,6 +46,10 @@
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.RestoreListener;
+import org.apache.activemq.queue.RestoredElement;
+import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Jun 1 04:05:34 2009
@@ -21,12 +21,11 @@
import java.util.Collection;
import java.util.HashMap;
-import org.apache.activemq.broker.db.BrokerDatabase;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.db.BrokerDatabase.OperationContext;
-import org.apache.activemq.broker.store.QueueDescriptor;
+import org.apache.activemq.broker.BrokerDatabase.OperationContext;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.SaveableQueueElement;
public abstract class BrokerMessageDelivery implements MessageDelivery {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Mon Jun 1 04:05:34 2009
@@ -22,10 +22,6 @@
import java.util.HashMap;
import java.util.Iterator;
-import org.apache.activemq.broker.db.BrokerDatabase;
-import org.apache.activemq.broker.db.RestoreListener;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.ISourceController;
@@ -36,7 +32,10 @@
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.PartitionedQueue;
import org.apache.activemq.queue.PersistencePolicy;
+import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.RestoreListener;
+import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.queue.SharedPriorityQueue;
import org.apache.activemq.queue.SharedQueue;
import org.apache.activemq.queue.SharedQueueOld;
@@ -401,7 +400,7 @@
}
public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
- org.apache.activemq.broker.db.RestoreListener<MessageDelivery> listener) {
+ org.apache.activemq.queue.RestoreListener<MessageDelivery> listener) {
database.restoreMessages(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener);
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java Mon Jun 1 04:05:34 2009
@@ -18,10 +18,6 @@
import java.util.Collection;
-import org.apache.activemq.broker.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.protobuf.AsciiBuffer;
public interface Destination {
@@ -29,7 +25,6 @@
AsciiBuffer getDomain();
AsciiBuffer getName();
Collection<Destination> getDestinations();
- public ActiveMQDestination asActiveMQDestination();
public class SingleDestination implements Destination {
@@ -72,17 +67,17 @@
setDomain(new AsciiBuffer(domain));
}
- public ActiveMQDestination asActiveMQDestination() {
- if(domain.equals(Router.TOPIC_DOMAIN))
- {
- return new ActiveMQTopic(name.toString());
- }
- else if(domain.equals(Router.QUEUE_DOMAIN))
- {
- return new ActiveMQQueue(name.toString());
- }
- return null;
- }
+// public ActiveMQDestination asActiveMQDestination() {
+// if(domain.equals(Router.TOPIC_DOMAIN))
+// {
+// return new ActiveMQTopic(name.toString());
+// }
+// else if(domain.equals(Router.QUEUE_DOMAIN))
+// {
+// return new ActiveMQQueue(name.toString());
+// }
+// return null;
+// }
}
public class MultiDestination implements Destination {
@@ -112,9 +107,9 @@
return null;
}
- public ActiveMQDestination asActiveMQDestination() {
- throw new UnsupportedOperationException("Not yet implemented");
- }
+// public ActiveMQDestination asActiveMQDestination() {
+// throw new UnsupportedOperationException("Not yet implemented");
+// }
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java Mon Jun 1 04:05:34 2009
@@ -16,10 +16,8 @@
*/
package org.apache.activemq.broker;
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
@@ -72,17 +70,11 @@
return true;
}
- Message msg = message.asType(Message.class);
- if (msg == null) {
- return false;
- }
-
- MessageEvaluationContext selectorContext = new MessageEvaluationContext();
- selectorContext.setMessageReference(msg);
- selectorContext.setDestination(destination.asActiveMQDestination());
+ MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+ selectorContext.setDestination(destination);
try {
return (selector.matches(selectorContext));
- } catch (JMSException e) {
+ } catch (FilterException e) {
e.printStackTrace();
return false;
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageBroker.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Jun 1 04:05:34 2009
@@ -23,7 +23,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Connection;
-import org.apache.activemq.broker.db.BrokerDatabase;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.protobuf.AsciiBuffer;
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Jun 1 04:05:34 2009
@@ -16,12 +16,13 @@
*/
package org.apache.activemq.broker;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.SaveableQueueElement;
public interface MessageDelivery {
@@ -117,4 +118,10 @@
* @return The store tracking or -1 if not set.
*/
public long getStoreTracking();
+
+ /**
+ * Used to apply selectors against the message.
+ * @return
+ */
+ public MessageEvaluationContext createMessageEvaluationContext();
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Mon Jun 1 04:05:34 2009
@@ -16,12 +16,13 @@
*/
package org.apache.activemq.broker;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.SaveableQueueElement;
/**
* @author cmacnaug
@@ -189,6 +190,10 @@
delegate.persist(elem, controller, delayable);
}
+ public MessageEvaluationContext createMessageEvaluationContext() {
+ return delegate.createMessageEvaluationContext();
+ }
+
/**
* (non-Javadoc)
*
@@ -198,4 +203,5 @@
MessageDeliveryWrapper(MessageDelivery delivery) {
delegate = delivery;
}
+
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java Mon Jun 1 04:05:34 2009
@@ -27,7 +27,6 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.QueueDomain;
import org.apache.activemq.broker.TopicDomain;
-import org.apache.activemq.broker.db.BrokerDatabase;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java Mon Jun 1 04:05:34 2009
@@ -16,10 +16,8 @@
*/
package org.apache.activemq.broker;
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.queue.Subscription;
@@ -81,17 +79,11 @@
return true;
}
- Message msg = message.asType(Message.class);
- if (msg == null) {
- return false;
- }
-
- MessageEvaluationContext selectorContext = new MessageEvaluationContext();
- selectorContext.setMessageReference(msg);
- selectorContext.setDestination(destination.asActiveMQDestination());
+ MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+ selectorContext.setDestination(destination);
try {
return (selector.matches(selectorContext));
- } catch (JMSException e) {
+ } catch (FilterException e) {
e.printStackTrace();
return false;
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Mon Jun 1 04:05:34 2009
@@ -19,7 +19,6 @@
import java.io.File;
import java.util.ArrayList;
-import org.apache.activemq.broker.db.BrokerDatabase;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
import org.apache.activemq.dispatch.IDispatcher;
Propchange: activemq/sandbox/activemq-flow/activemq-client/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Jun 1 04:05:34 2009
@@ -0,0 +1,9 @@
+.project
+.classpath
+.settings
+.wtpmodules
+*.iml
+junit*.properties
+eclipse-classes
+target
+
Modified: activemq/sandbox/activemq-flow/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/pom.xml?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/pom.xml Mon Jun 1 04:05:34 2009
@@ -38,9 +38,24 @@
<artifactId>activemq-util</artifactId>
<optional>false</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-dispatcher</artifactId>
+ <optional>true</optional>
+ </dependency>
<!-- Testing Dependencies -->
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-transport</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -56,19 +71,20 @@
<build>
<plugins>
- <!-- Generate a test jar for the test cases in this package -->
- <!--
<plugin>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ <configuration>
+ <type>alt</type>
+ </configuration>
+ <executions>
<execution>
<goals>
- <goal>test-jar</goal>
+ <goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
- -->
</plugins>
</build>
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java Mon Jun 1 04:05:34 2009
@@ -24,10 +24,6 @@
import java.util.LinkedList;
import java.util.Map.Entry;
-import org.apache.activemq.broker.db.RestoreListener;
-import org.apache.activemq.broker.db.RestoredElement;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Mon Jun 1 04:05:34 2009
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.queue;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowResource;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Mon Jun 1 04:05:34 2009
@@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.HashSet;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
Copied: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDescriptor.java (from r780474, activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDescriptor.java?p2=activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDescriptor.java&p1=activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java&r1=780474&r2=780558&rev=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDescriptor.java Mon Jun 1 04:05:34 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.store;
+package org.apache.activemq.queue;
import org.apache.activemq.protobuf.AsciiBuffer;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStore.java Mon Jun 1 04:05:34 2009
@@ -17,10 +17,6 @@
package org.apache.activemq.queue;
-import org.apache.activemq.broker.db.RestoreListener;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.db.BrokerDatabase.OperationContext;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.flow.ISourceController;
public interface QueueStore<K, V> {
@@ -91,7 +87,6 @@
* The maximum number of messages to load (-1 if no limit)
* @param listener
* The listener to which restored elements should be passed.
- * @return The {@link OperationContext} associated with the operation
*/
public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<V> listener);
Copied: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoreListener.java (from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoreListener.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoreListener.java?p2=activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoreListener.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoreListener.java&r1=780474&r2=780558&rev=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoreListener.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoreListener.java Mon Jun 1 04:05:34 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.db;
+package org.apache.activemq.queue;
import java.util.Collection;
Copied: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoredElement.java (from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoredElement.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoredElement.java?p2=activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoredElement.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoredElement.java&r1=780474&r2=780558&rev=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/RestoredElement.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/RestoredElement.java Mon Jun 1 04:05:34 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.db;
+package org.apache.activemq.queue;
/**
* A holder for queue elements loaded from the store.
Copied: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java (from r780474, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/SaveableQueueElement.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java?p2=activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/SaveableQueueElement.java&r1=780474&r2=780558&rev=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/db/SaveableQueueElement.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java Mon Jun 1 04:05:34 2009
@@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.db;
+package org.apache.activemq.queue;
-import org.apache.activemq.broker.store.QueueDescriptor;
public interface SaveableQueueElement<V> {
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Mon Jun 1 04:05:34 2009
@@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.HashSet;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.ISourceController;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Mon Jun 1 04:05:34 2009
@@ -18,7 +18,6 @@
import java.util.HashMap;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Mon Jun 1 04:05:34 2009
@@ -23,7 +23,6 @@
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowResource;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Mon Jun 1 04:05:34 2009
@@ -5,15 +5,15 @@
import java.util.HashMap;
-import org.apache.activemq.broker.db.RestoreListener;
-import org.apache.activemq.broker.db.SaveableQueueElement;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.PartitionedQueue;
import org.apache.activemq.queue.PersistencePolicy;
+import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.RestoreListener;
+import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.queue.SharedPriorityQueue;
import org.apache.activemq.queue.SharedQueue;
import org.apache.activemq.queue.SharedQueueOld;
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java Mon Jun 1 04:05:34 2009
@@ -12,12 +12,12 @@
import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
import org.apache.activemq.flow.Commands.Message.MessageBean;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.StatefulWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
public class Proto2WireFormatFactory implements WireFormatFactory {
Modified: activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=780558&r1=780474&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Mon Jun 1 04:05:34 2009
@@ -11,12 +11,12 @@
import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.StatefulWireFormat;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
public class ProtoWireFormatFactory implements WireFormatFactory {
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Mon Jun 1 04:05:34 2009
@@ -27,11 +27,11 @@
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.DuplicateKeyException;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.activemq.queue.QueueDescriptor;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Mon Jun 1 04:05:34 2009
@@ -30,7 +30,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
@@ -50,6 +49,7 @@
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
import org.apache.activemq.protobuf.MessageBuffer;
import org.apache.activemq.protobuf.PBMessage;
+import org.apache.activemq.queue.QueueDescriptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Journal;
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Mon Jun 1 04:05:34 2009
@@ -20,10 +20,10 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueDescriptor;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.VariableMarshaller;
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Mon Jun 1 04:05:34 2009
@@ -25,12 +25,12 @@
import java.util.TreeMap;
import java.util.Map.Entry;
-import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.KeyNotFoundException;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueDescriptor;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
Propchange: activemq/sandbox/activemq-flow/activemq-network/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Jun 1 04:05:34 2009
@@ -0,0 +1,9 @@
+.project
+.classpath
+.settings
+.wtpmodules
+*.iml
+junit*.properties
+eclipse-classes
+target
+
Modified: activemq/sandbox/activemq-flow/activemq-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/pom.xml?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/pom.xml Mon Jun 1 04:05:34 2009
@@ -40,6 +40,13 @@
<!-- Testing Dependencies -->
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${activemq-version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Jun 1 04:05:34 2009
@@ -22,6 +22,7 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
@@ -139,4 +140,8 @@
public long getExpiration() {
return message.getExpiration();
}
+
+ public MessageEvaluationContext createMessageEvaluationContext() {
+ return new OpenwireMessageEvaluationContext();
+ }
}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireMessageEvaluationContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireMessageEvaluationContext.java?rev=780558&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireMessageEvaluationContext.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireMessageEvaluationContext.java Mon Jun 1 04:05:34 2009
@@ -0,0 +1,208 @@
+package org.apache.activemq.broker.openwire;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.filter.Expression;
+import org.apache.activemq.filter.FilterException;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.protobuf.Buffer;
+
+public class OpenwireMessageEvaluationContext implements MessageEvaluationContext {
+
+ private Message message;
+
+ public OpenwireMessageEvaluationContext() {
+ }
+ public OpenwireMessageEvaluationContext(Message message) {
+ this.message = message;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+ private static final Map<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+ private Object destination;
+
+ static {
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ ActiveMQDestination dest = message.getOriginalDestination();
+ if (dest == null) {
+ dest = message.getDestination();
+ }
+ if (dest == null) {
+ return null;
+ }
+ return dest.toString();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ if (message.getReplyTo() == null) {
+ return null;
+ }
+ return message.getReplyTo().toString();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return message.getType();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Integer.valueOf(message.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Integer.valueOf(message.getPriority());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSMessageID", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ if (message.getMessageId() == null) {
+ return null;
+ }
+ return message.getMessageId().toString();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Long.valueOf(message.getTimestamp());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return message.getCorrelationId();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Long.valueOf(message.getExpiration());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Boolean.valueOf(message.isRedelivered());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXDeliveryCount", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Integer.valueOf(message.getRedeliveryCounter() + 1);
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupID", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return message.getGroupID();
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupSeq", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return new Integer(message.getGroupSequence());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSXProducerTXID", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ TransactionId txId = message.getOriginalTransactionId();
+ if (txId == null) {
+ txId = message.getTransactionId();
+ }
+ if (txId == null) {
+ return null;
+ }
+ return new Integer(txId.toString());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerInTime", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Long.valueOf(message.getBrokerInTime());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerOutTime", new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return Long.valueOf(message.getBrokerOutTime());
+ }
+ });
+ }
+
+ public Expression getPropertyExpression(final String name) {
+ Expression expression = JMS_PROPERTY_EXPRESSIONS.get(name);
+ if( expression == null ) {
+ expression = new Expression() {
+ public Object evaluate(MessageEvaluationContext mc) throws FilterException {
+ try {
+ Message message = ((OpenwireMessageEvaluationContext) mc).message;
+ return message.getProperty(name);
+ } catch (IOException e) {
+ throw new FilterException(e);
+ }
+ }
+ };
+ }
+ return expression;
+ }
+
+ public <T> T getBodyAs(Class<T> type) throws FilterException {
+ try {
+ if( type == String.class ) {
+ if ( message instanceof ActiveMQTextMessage ) {
+ return type.cast(((ActiveMQTextMessage)message).getText());
+ }
+ }
+ if( type == Buffer.class ) {
+ if ( message instanceof ActiveMQBytesMessage ) {
+ ActiveMQBytesMessage bm = ((ActiveMQBytesMessage)message);
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return type.cast(new Buffer(data));
+ }
+ }
+ return null;
+ } catch (JMSException e) {
+ throw new FilterException(e);
+ }
+ }
+
+ public <T> T getDestination() {
+ return (T) destination;
+ }
+ public Object getLocalConnectionId() {
+ throw new UnsupportedOperationException();
+ }
+ public void setDestination(Object destination) {
+ this.destination = destination;
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Jun 1 04:05:34 2009
@@ -21,9 +21,6 @@
import java.util.HashMap;
import java.util.LinkedList;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.BrokerConnection;
import org.apache.activemq.broker.BrokerMessageDelivery;
@@ -69,8 +66,8 @@
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.LogicExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
@@ -418,7 +415,7 @@
HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
- public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException, UserAlreadyConnectedException {
+ public ConsumerContext(final ConsumerInfo info) throws FilterException, UserAlreadyConnectedException {
this.info = info;
this.name = info.getConsumerId().toString();
@@ -513,12 +510,11 @@
return false;
}
- MessageEvaluationContext selectorContext = new MessageEvaluationContext();
- selectorContext.setMessageReference(msg);
+ OpenwireMessageEvaluationContext selectorContext = new OpenwireMessageEvaluationContext(msg);
selectorContext.setDestination(info.getDestination());
try {
return (selector == null || selector.matches(selectorContext));
- } catch (JMSException e) {
+ } catch (FilterException e) {
e.printStackTrace();
return false;
}
@@ -680,7 +676,7 @@
return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
}
- private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
+ private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {
BooleanExpression rc = null;
if (info.getSelector() != null) {
rc = SelectorParser.parse(info.getSelector());
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Mon Jun 1 04:05:34 2009
@@ -24,7 +24,6 @@
import javax.jms.JMSException;
-import org.apache.activemq.broker.db.BrokerDatabase;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
Modified: activemq/sandbox/activemq-flow/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/ProtoMojo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/ProtoMojo.java?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/ProtoMojo.java (original)
+++ activemq/sandbox/activemq-flow/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/ProtoMojo.java Mon Jun 1 04:05:34 2009
@@ -19,11 +19,9 @@
import java.io.File;
import java.io.FileFilter;
-import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import org.apache.activemq.protobuf.compiler.parser.ParseException;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.project.MavenProject;
@@ -49,17 +47,31 @@
* The directory where the proto files (<code>*.proto</code>) are
* located.
*
- * @parameter expression="${sourceDirectory}" default-value="${basedir}/src/main/proto"
+ * @parameter default-value="${basedir}/src/main/proto"
*/
- private File sourceDirectory;
+ private File mainSourceDirectory;
/**
* The directory where the output files will be located.
*
- * @parameter expression="${outputDirectory}" default-value="${project.build.directory}/generated-sources/proto"
+ * @parameter default-value="${project.build.directory}/generated-sources/proto"
*/
- private File outputDirectory;
+ private File mainOutputDirectory;
+
+ /**
+ * The directory where the proto files (<code>*.proto</code>) are
+ * located.
+ *
+ * @parameter default-value="${basedir}/src/test/proto"
+ */
+ private File testSourceDirectory;
+ /**
+ * The directory where the output files will be located.
+ *
+ * @parameter default-value="${project.build.directory}/test-generated-sources/proto"
+ */
+ private File testOutputDirectory;
/**
* The type of generator to run.
@@ -70,28 +82,49 @@
public void execute() throws MojoExecutionException {
- File[] files = sourceDirectory.listFiles(new FileFilter() {
- public boolean accept(File pathname) {
- return pathname.getName().endsWith(".proto");
+ File[] mainFiles = null;
+ if ( mainSourceDirectory.exists() ) {
+ mainFiles = mainSourceDirectory.listFiles(new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.getName().endsWith(".proto");
+ }
+ });
+ if (mainFiles==null || mainFiles.length==0) {
+ getLog().warn("No proto files found in directory: " + mainSourceDirectory.getPath());
+ } else {
+ processFiles(mainFiles, mainOutputDirectory);
+ this.project.addCompileSourceRoot(mainOutputDirectory.getAbsolutePath());
}
- });
-
- if (files==null || files.length==0) {
- getLog().warn("No proto files found in directory: " + sourceDirectory.getPath());
- return;
}
- List<File> recFiles = Arrays.asList(files);
+ File[] testFiles = null;
+ if ( testSourceDirectory.exists() ) {
+ testFiles = testSourceDirectory.listFiles(new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.getName().endsWith(".proto");
+ }
+ });
+ if (testFiles==null || testFiles.length==0) {
+ getLog().warn("No proto files found in directory: " + testSourceDirectory.getPath());
+ } else {
+ processFiles(testFiles, testOutputDirectory);
+ this.project.addTestCompileSourceRoot(testOutputDirectory.getAbsolutePath());
+ }
+ }
+ }
+
+ private void processFiles(File[] mainFiles, File outputDir) throws MojoExecutionException {
+ List<File> recFiles = Arrays.asList(mainFiles);
for (File file : recFiles) {
try {
getLog().info("Compiling: "+file.getPath());
if( "default".equals(type) ) {
JavaGenerator generator = new JavaGenerator();
- generator.setOut(outputDirectory);
+ generator.setOut(outputDir);
generator.compile(file);
} else if( "alt".equals(type) ) {
AltJavaGenerator generator = new AltJavaGenerator();
- generator.setOut(outputDirectory);
+ generator.setOut(outputDir);
generator.compile(file);
}
} catch (CompilerException e) {
@@ -104,8 +137,6 @@
throw new MojoExecutionException("Compile failed. For more details see error messages listed above.", e);
}
}
-
- this.project.addCompileSourceRoot(outputDirectory.getAbsolutePath());
}
}
Modified: activemq/sandbox/activemq-flow/activemq-protobuf/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-protobuf/pom.xml?rev=780558&r1=780557&r2=780558&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-protobuf/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-protobuf/pom.xml Mon Jun 1 04:05:34 2009
@@ -167,7 +167,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
- <version>2.0</version>
+ <version>2.4.1</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Propchange: activemq/sandbox/activemq-flow/activemq-selector/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Jun 1 04:05:34 2009
@@ -0,0 +1,9 @@
+.project
+.classpath
+.settings
+.wtpmodules
+*.iml
+junit*.properties
+eclipse-classes
+target
+
Added: activemq/sandbox/activemq-flow/activemq-selector/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-selector/pom.xml?rev=780558&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-selector/pom.xml (added)
+++ activemq/sandbox/activemq-flow/activemq-selector/pom.xml Mon Jun 1 04:05:34 2009
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-selector</artifactId>
+ <packaging>jar</packaging>
+ <version>6.0-SNAPSHOT</version>
+
+ <name>ActiveMQ :: Selector</name>
+
+ <dependencies>
+ <!-- TODO: try to remove this dependency -->
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>xalan</groupId>
+ <artifactId>xalan</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.4.1</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated-javacc</outputDirectory>
+ <packageName>org.apache.activemq.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>