You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/17 02:12:50 UTC
svn commit: r964988 [1/2] - in /activemq/sandbox/activemq-apollo-actor:
apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apac...
Author: chirino
Date: Sat Jul 17 00:12:48 2010
New Revision: 964988
URL: http://svn.apache.org/viewvc?rev=964988&view=rev
Log:
Laying foundation for supporting more flexible message routing.
- Wildcard bits are in now
- Queues are now created using an extensible 'binding' object which controls how the queue gets connected to destinations.
- Allows us to use queues to implement durable subs
- binding controls the queue filter
- and which destinations it binds to (one queue can bind to multiple destinations)
Added:
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/
activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java
- copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java
Removed:
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/BrokerDTO.scaml
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
Added: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=964988&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (added)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Sat Jul 17 00:12:48 2010
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.PointToPointBindingFactory
+org.apache.activemq.apollo.broker.DurableSubBindingFactory
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=964988&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Sat Jul 17 00:12:48 2010
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.apache.activemq.apollo.util.ClassFinder
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.apache.activemq.apollo.dto.{JsonCodec, DurableSubscriptionBindingDTO, PointToPointBindingDTO, BindingDTO}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.filter.{ConstantExpression, BooleanExpression}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object BindingFactory {
+
+ trait Provider {
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding
+ def create(binding_dto:BindingDTO):Binding
+ }
+
+ def discover = {
+ val finder = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/binding-factory.index")
+ finder.new_instances
+ }
+
+ var providers = discover
+
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding = {
+ providers.foreach { provider=>
+ val rc = provider.create(binding_kind, binding_data)
+ if( rc!=null ) {
+ return rc
+ }
+ }
+ throw new IllegalArgumentException("Invalid binding type: "+binding_kind);
+ }
+ def create(binding_dto:BindingDTO):Binding = {
+ providers.foreach { provider=>
+ val rc = provider.create(binding_dto)
+ if( rc!=null ) {
+ return rc
+ }
+ }
+ throw new IllegalArgumentException("Invalid binding type: "+binding_dto);
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Binding {
+
+ /**
+ * A user friendly description of the binding.
+ */
+ def label:String
+
+ /**
+ * Wires a queue into the a virtual host based on the binding information contained
+ * in the buffer.
+ */
+ def bind(node:RoutingNode, queue:Queue)
+
+ def unbind(node:RoutingNode, queue:Queue)
+
+ def binding_kind:AsciiBuffer
+
+ def binding_data:Buffer
+
+ def binding_dto:BindingDTO
+
+ def message_filter:BooleanExpression = ConstantExpression.TRUE
+
+ def destination:AsciiBuffer
+}
+
+object PointToPointBinding {
+ val POINT_TO_POINT_KIND = new AsciiBuffer("p2p")
+ val DESTINATION_PATH = new AsciiBuffer("default");
+}
+
+import PointToPointBinding._
+
+class PointToPointBindingFactory extends BindingFactory.Provider {
+
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+ if( binding_kind == POINT_TO_POINT_KIND ) {
+ val dto = new PointToPointBindingDTO
+ dto.destination = binding_data.ascii.toString
+ new PointToPointBinding(binding_data, dto)
+ } else {
+ null
+ }
+ }
+
+ def create(binding_dto:BindingDTO) = {
+ if( binding_dto.isInstanceOf[PointToPointBindingDTO] ) {
+ val p2p_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
+ val data = new AsciiBuffer(p2p_dto.destination).buffer
+ new PointToPointBinding(data, p2p_dto)
+ } else {
+ null
+ }
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class PointToPointBinding(val binding_data:Buffer, val binding_dto:PointToPointBindingDTO) extends Binding {
+
+ def binding_kind = POINT_TO_POINT_KIND
+
+ def unbind(node: RoutingNode, queue: Queue) = {
+ if( node.unified ) {
+ node.remove_broadcast_consumer(queue)
+ }
+ }
+
+ def bind(node: RoutingNode, queue: Queue) = {
+ if( node.unified ) {
+ node.add_broadcast_consumer(queue)
+ }
+ }
+
+ def label = binding_dto.destination
+
+ override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
+
+ override def equals(o:Any):Boolean = o match {
+ case x: PointToPointBinding => x.binding_data == binding_data
+ case _ => false
+ }
+
+ def destination = new AsciiBuffer(binding_dto.destination)
+}
+
+
+object DurableSubBinding {
+ val DURABLE_SUB_KIND = new AsciiBuffer("ds")
+}
+
+import DurableSubBinding._
+
+class DurableSubBindingFactory extends BindingFactory.Provider {
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+ if( binding_kind == DURABLE_SUB_KIND ) {
+ new DurableSubBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionBindingDTO]))
+ } else {
+ null
+ }
+ }
+ def create(binding_dto:BindingDTO) = {
+ if( binding_dto.isInstanceOf[DurableSubscriptionBindingDTO] ) {
+ new DurableSubBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionBindingDTO])
+ } else {
+ null
+ }
+ }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DurableSubBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionBindingDTO) extends Binding {
+
+ def binding_kind = DURABLE_SUB_KIND
+
+
+ def unbind(node: RoutingNode, queue: Queue) = {
+ node.add_broadcast_consumer(queue)
+ }
+
+ def bind(node: RoutingNode, queue: Queue) = {
+ node.remove_broadcast_consumer(queue)
+ }
+
+ def label = {
+ var rc = "sub: '"+binding_dto.subscription_id+"'"
+ if( binding_dto.filter!=null ) {
+ rc += " filtering '"+binding_dto.filter+"'"
+ }
+ if( binding_dto.client_id!=null ) {
+ rc += " for client '"+binding_dto.client_id+"'"
+ }
+ rc
+ }
+
+ override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
+
+ override def equals(o:Any):Boolean = o match {
+ case x: DurableSubBinding => x.binding_data == binding_data
+ case _ => false
+ }
+
+ override def message_filter = {
+ if ( binding_dto.filter==null ) {
+ ConstantExpression.TRUE
+ } else {
+ SelectorParser.parse(binding_dto.filter)
+ }
+ }
+
+ def destination = new AsciiBuffer(binding_dto.destination)
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Sat Jul 17 00:12:48 2010
@@ -53,13 +53,13 @@ object DestinationParser {
}
} else {
value.getDomain match {
- case Domain.QUEUE_DOMAIN =>
+ case Router.QUEUE_DOMAIN =>
baos.write(options.queuePrefix)
- case Domain.TOPIC_DOMAIN =>
+ case Router.TOPIC_DOMAIN =>
baos.write(options.topicPrefix)
- case Domain.TEMP_QUEUE_DOMAIN =>
+ case Router.TEMP_QUEUE_DOMAIN =>
baos.write(options.tempQueuePrefix)
- case Domain.TEMP_TOPIC_DOMAIN =>
+ case Router.TEMP_TOPIC_DOMAIN =>
baos.write(options.tempTopicPrefix)
}
baos.write(value.getName)
@@ -97,16 +97,16 @@ object DestinationParser {
} else {
if (options.queuePrefix != null && value.startsWith(options.queuePrefix)) {
var name = value.slice(options.queuePrefix.length, value.length).ascii();
- return new SingleDestination(Domain.QUEUE_DOMAIN, name);
+ return new SingleDestination(Router.QUEUE_DOMAIN, name);
} else if (options.topicPrefix != null && value.startsWith(options.topicPrefix)) {
var name = value.slice(options.topicPrefix.length, value.length).ascii();
- return new SingleDestination(Domain.TOPIC_DOMAIN, name);
+ return new SingleDestination(Router.TOPIC_DOMAIN, name);
} else if (options.tempQueuePrefix != null && value.startsWith(options.tempQueuePrefix)) {
var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
- return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
+ return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, name);
} else if (options.tempTopicPrefix != null && value.startsWith(options.tempTopicPrefix)) {
var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
- return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
+ return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, name);
} else {
if (options.defaultDomain == null) {
return null;
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jul 17 00:12:48 2010
@@ -30,28 +30,6 @@ import org.apache.activemq.apollo.store.
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait QueueLifecyleListener {
-
- /**
- * A destination has bean created
- *
- * @param queue
- */
- def onCreate(queue: Queue);
-
- /**
- * A destination has bean destroyed
- *
- * @param queue
- */
- def onDestroy(queue: Queue);
-
-}
-
-
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
}
@@ -60,16 +38,18 @@ object Queue extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, val destination: Destination, val id: Long) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
+class Queue(val host: VirtualHost, var id:Long, val binding:Binding) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
override protected def log = Queue
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
var fast_subscriptions = List[Subscription]()
- override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
+ val filter = binding.message_filter
+
+ override val dispatchQueue: DispatchQueue = createQueue(binding.label);
dispatchQueue.setTargetQueue(getRandomThreadQueue)
dispatchQueue {
- debug("created queue for: " + destination)
+ debug("created queue for: " + binding.label)
}
setDisposer(^ {
ack_source.release
@@ -177,25 +157,46 @@ class Queue(val host: VirtualHost, val d
}
if( tune_persistent ) {
- host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
- dispatchQueue {
- if( !ranges.isEmpty ) {
-
- ranges.foreach { range =>
- val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
- entries.addLast(entry)
-
- message_seq_counter = range.lastQueueSeq + 1
- enqueue_item_counter += range.count
- enqueue_size_counter += range.size
- }
- debug("restored: "+enqueue_item_counter)
- }
+ if( id == -1 ) {
+ id = host.queue_id_counter.incrementAndGet
+
+ val record = new QueueRecord
+ record.key = id
+ record.binding_data = binding.binding_data
+ record.binding_kind = binding.binding_kind
+
+ host.store.addQueue(record) { rc =>
completed
}
+
+ } else {
+
+ host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
+ dispatchQueue {
+ if( !ranges.isEmpty ) {
+
+ ranges.foreach { range =>
+ val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
+ entries.addLast(entry)
+
+ message_seq_counter = range.lastQueueSeq + 1
+ enqueue_item_counter += range.count
+ enqueue_size_counter += range.size
+ }
+
+ debug("restored: "+enqueue_item_counter)
+ }
+ completed
+ }
+ }
+
}
+
} else {
+ if( id == -1 ) {
+ id = host.queue_id_counter.incrementAndGet
+ }
completed
}
}
@@ -450,7 +451,7 @@ class Queue(val host: VirtualHost, val d
//
/////////////////////////////////////////////////////////////////////
- def matches(message: Delivery) = {true}
+ def matches(delivery: Delivery) = filter.matches(delivery.message)
def connect(p: DeliveryProducer) = new DeliverySession {
retain
@@ -502,7 +503,7 @@ class Queue(val host: VirtualHost, val d
//
/////////////////////////////////////////////////////////////////////
- def connected(values: List[DeliveryConsumer]) = bind(values)
+ def connected() = {}
def bind(values: List[DeliveryConsumer]) = retaining(values) {
for (consumer <- values) {
@@ -583,7 +584,6 @@ class Queue(val host: VirtualHost, val d
this.dispatchQueue.setTargetQueue(value.getTargetQueue)
}
}
-
}
object QueueEntry extends Sizer[QueueEntry] {
@@ -1285,7 +1285,6 @@ class QueueEntry(val queue:Queue, val se
}
-
}
/**
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sat Jul 17 00:12:48 2010
@@ -21,49 +21,25 @@ import _root_.org.fusesource.hawtbuf._
import _root_.org.fusesource.hawtdispatch._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import path.PathMap
import collection.JavaConversions
-import org.apache.activemq.apollo.util.LongCounter
-import collection.mutable.HashMap
import org.apache.activemq.apollo.util._
+import collection.mutable.{ListBuffer, HashMap}
+import org.apache.activemq.apollo.store.QueueRecord
+import org.apache.activemq.apollo.dto.{PointToPointBindingDTO, BindingDTO}
+import path.{PathFilter, PathMap}
+import scala.collection.immutable.List
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object Domain {
+object Router extends Log {
val TOPIC_DOMAIN = new AsciiBuffer("topic");
val QUEUE_DOMAIN = new AsciiBuffer("queue");
val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
-}
-
-import Domain._
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class Domain {
-
- val targets = new PathMap[DeliveryConsumer]();
-
- def bind(name:AsciiBuffer, queue:DeliveryConsumer) = {
- targets.put(name, queue);
- }
-
- def unbind(name:AsciiBuffer, queue:DeliveryConsumer) = {
- targets.remove(name, queue);
- }
-
-//
-// synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
-// return targets.get(name);
-// }
-
-}
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object Router extends Log {
+ val QUEUE_KIND = new AsciiBuffer("queue");
+ val DEFAULT_QUEUE_PATH = new AsciiBuffer("default");
}
/**
@@ -81,133 +57,263 @@ object Router extends Log {
*/
class Router(val host:VirtualHost) extends DispatchLogging {
+ override protected def log = Router
+
+ import Router._
+
val destination_id_counter = new LongCounter
- override protected def log = Router
protected def dispatchQueue:DispatchQueue = host.dispatchQueue
- trait DestinationNode {
- val destination:Destination
- val id = destination_id_counter.incrementAndGet
- var targets = List[DeliveryConsumer]()
- var routes = List[DeliveryProducerRoute]()
+ var queues = HashMap[Binding, Queue]()
+
+ // Only stores simple paths, used for wild card lookups.
+ var destinations = new PathMap[RoutingNode]()
+ // Can store consumers on wild cards paths
+ val broadcast_consumers = new PathMap[DeliveryConsumer]()
+ // Can store bindings on wild cards paths
+ val bindings = new PathMap[Queue]()
- def on_bind(x:List[DeliveryConsumer]):Unit
- def on_unbind(x:List[DeliveryConsumer]):Boolean
- def on_connect(route:DeliveryProducerRoute):Unit
- def on_disconnect(route:DeliveryProducerRoute):Boolean = {
- routes = routes.filterNot({r=> route==r})
- route.disconnected()
- routes == Nil && targets == Nil
+ private def is_topic(destination:Destination) = {
+ destination.getDomain match {
+ case TOPIC_DOMAIN => true
+ case TEMP_TOPIC_DOMAIN => true
+ case _ => false
}
}
- class TopicDestinationNode(val destination:Destination) extends DestinationNode {
- def on_bind(x:List[DeliveryConsumer]) = {
- targets = x ::: targets
- routes.foreach({r=>
- r.bind(x)
- })
- }
+ def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(PathFilter.ANY_DESCENDENT))
+
+ def create_destination_or(destination:AsciiBuffer)(func:(RoutingNode)=>Unit):RoutingNode = {
- def on_unbind(x:List[DeliveryConsumer]):Boolean = {
- targets = targets.filterNot({t=>x.contains(t)})
- routes.foreach({r=>
- r.unbind(x)
- })
- routes == Nil && targets == Nil
- }
+ // We can't create a wild card destination.. only wild card subscriptions.
+ assert( !PathFilter.containsWildCards(destination) )
- def on_connect(route:DeliveryProducerRoute) = {
- routes = route :: routes
- route.connected(targets)
- }
- }
+ var rc = destinations.chooseValue( destination )
+ if( rc == null ) {
- class QueueDestinationNode(val destination:Destination) extends DestinationNode {
- var queue:Queue = null
+ // A new destination is being created...
+ rc = new RoutingNode(this, destination )
+ destinations.put(destination, rc)
- // once the queue is created.. connect it up with the producers and targets.
- host.getQueue(destination) { q =>
- dispatchQueue {
- queue = q;
- queue.bind(targets)
- routes.foreach({route=>
- route.connected(queue :: Nil)
- })
+ // bind any matching wild card subs
+ import JavaConversions._
+ broadcast_consumers.get( destination ).foreach { c=>
+ rc.add_broadcast_consumer(c)
}
- }
-
- def on_bind(x:List[DeliveryConsumer]) = {
- targets = x ::: targets
- if( queue!=null ) {
- queue.bind(x)
+ bindings.get( destination ).foreach { queue=>
+ rc.add_queue(queue)
}
+
+ } else {
+ func(rc)
}
+ rc
+ }
- def on_unbind(x:List[DeliveryConsumer]):Boolean = {
- targets = targets.filterNot({t=>x.contains(t)})
- if( queue!=null ) {
- queue.unbind(x)
+ def get_destination_matches(destination:AsciiBuffer) = {
+ import JavaConversions._
+ asIterable(destinations.get( destination ))
+ }
+
+ def _create_queue(id:Long, binding:Binding):Queue = {
+ val queue = new Queue(host, id, binding)
+ queue.start
+ queues.put(binding, queue)
+
+ // Not all queues are bound to destinations.
+ val name = binding.destination
+ if( name!=null ) {
+ bindings.put(name, queue)
+ // make sure the destination is created if this is not a wild card sub
+ if( !PathFilter.containsWildCards(name) ) {
+ create_destination_or(name) { node=>
+ node.add_queue(queue)
+ }
+ } else {
+ get_destination_matches(name).foreach( node=>
+ node.add_queue(queue)
+ )
}
- routes == Nil && targets == Nil
+
}
+ queue
+ }
- def on_connect(route:DeliveryProducerRoute) = {
- routes = route :: routes
- if( queue!=null ) {
- route.connected(queue :: Nil)
- }
+ def create_queue(record:QueueRecord) = {
+ _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data))
+ }
+
+ /**
+ * Returns the previously created queue if it already existed.
+ */
+ def _create_queue(dto: BindingDTO): Some[Queue] = {
+ val binding = BindingFactory.create(dto)
+ val queue = queues.get(binding) match {
+ case Some(queue) => Some(queue)
+ case None => Some(_create_queue(-1, binding))
}
+ queue
}
- var destinations = new HashMap[Destination, DestinationNode]()
+ def create_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
+ cb(_create_queue(dto))
+ } >>: dispatchQueue
- private def get(destination:Destination):DestinationNode = {
- destinations.getOrElseUpdate(destination,
- if( isTopic(destination) ) {
- new TopicDestinationNode(destination)
- } else {
- new QueueDestinationNode(destination)
- }
+ /**
+ * Returns true if the queue no longer exists.
+ */
+ def destroy_queue(dto:BindingDTO)(cb: (Boolean)=>Unit) = ^{
+ val binding = BindingFactory.create(dto)
+ val queue = queues.get(binding) match {
+ case Some(queue) =>
+ val name = binding.destination
+ if( name!=null ) {
+ get_destination_matches(name).foreach( node=>
+ node.remove_queue(queue)
+ )
+ }
+ queue.stop
+ true
+ case None =>
+ true
+ }
+ cb(queue)
+ } >>: dispatchQueue
+
+ /**
+ * Gets an existing queue.
+ */
+ def get_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
+ val binding = BindingFactory.create(dto)
+ cb(queues.get(binding))
+ } >>: dispatchQueue
+
+ def bind(destination:Destination, consumer:DeliveryConsumer) = retaining(consumer) {
+
+ assert( is_topic(destination) )
+
+ val name = destination.getName
+
+ // make sure the destination is created if this is not a wild card sub
+ if( !PathFilter.containsWildCards(name) ) {
+ val node = create_destination_or(name) { node=> }
+ }
+
+ get_destination_matches(name).foreach( node=>
+ node.add_broadcast_consumer(consumer)
)
- }
+ broadcast_consumers.put(name, consumer)
+
+ } >>: dispatchQueue
+
+ def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
+ assert( is_topic(destination) )
+ val name = destination.getName
+ broadcast_consumers.remove(name, consumer)
+ get_destination_matches(name).foreach{ node=>
+ node.remove_broadcast_consumer(consumer)
+ }
+ } >>: dispatchQueue
- def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
- get(destination).on_bind(targets)
- } >>: dispatchQueue
-
- def unbind(destination:Destination, targets:List[DeliveryConsumer]) = releasing(targets) {
- if( get(destination).on_unbind(targets) ) {
- destinations.remove(destination)
- }
- } >>: dispatchQueue
def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+
val route = new DeliveryProducerRoute(this, destination, producer) {
override def on_connected = {
completed(this);
}
}
- ^ {
- get(destination).on_connect(route)
- } >>: dispatchQueue
- }
- def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
- def isQueue(destination:Destination) = !isTopic(destination)
+ dispatchQueue {
+
+ val topic = is_topic(destination)
+
+ // Looking up the queue will cause it to get created if it does not exist.
+ val queue = if( !topic ) {
+ val dto = new PointToPointBindingDTO
+ dto.destination = destination.getName.toString
+ _create_queue(dto)
+ } else {
+ None
+ }
+
+ val node = create_destination_or(destination.getName) { node=> }
+ if( node.unified || topic ) {
+ node.add_broadcast_producer( route )
+ } else {
+ route.bind( queue.toList )
+ }
+
+ route.connected()
+ }
+ }
def disconnect(route:DeliveryProducerRoute) = releasing(route) {
- get(route.destination).on_disconnect(route)
- } >>: dispatchQueue
+ val topic = is_topic(route.destination)
+ val node = create_destination_or(route.destination.getName) { node=> }
+ if( node.unified || topic ) {
+ node.remove_broadcast_producer(route)
+ }
+ route.disconnected()
+
+ } >>: dispatchQueue
+
+}
- def each(proc:(Destination, DestinationNode)=>Unit) = dispatchQueue {
- import JavaConversions._
- for( (destination, node) <- destinations ) {
- proc(destination, node)
- }
- }
+
+/**
+ * Tracks state associated with a destination name.
+ */
+class RoutingNode(val router:Router, val name:AsciiBuffer) {
+
+ val id = router.destination_id_counter.incrementAndGet
+
+ var broadcast_producers = ListBuffer[DeliveryProducerRoute]()
+ var broadcast_consumers = ListBuffer[DeliveryConsumer]()
+ var queues = ListBuffer[Queue]()
+
+ // TODO: extract the node's config from the host config object
+ def unified = false
+
+ def add_broadcast_consumer (consumer:DeliveryConsumer) = {
+ broadcast_consumers += consumer
+
+ val list = consumer :: Nil
+ broadcast_producers.foreach({ r=>
+ r.bind(list)
+ })
+ }
+
+ def remove_broadcast_consumer (consumer:DeliveryConsumer) = {
+ broadcast_consumers = broadcast_consumers.filterNot( _ == consumer )
+
+ val list = consumer :: Nil
+ broadcast_producers.foreach({ r=>
+ r.unbind(list)
+ })
+ }
+
+ def add_broadcast_producer (producer:DeliveryProducerRoute) = {
+ broadcast_producers += producer
+ producer.bind(broadcast_consumers.toList)
+ }
+
+ def remove_broadcast_producer (producer:DeliveryProducerRoute) = {
+ broadcast_producers = broadcast_producers.filterNot( _ == producer )
+ producer.unbind(broadcast_consumers.toList)
+ }
+
+ def add_queue (queue:Queue) = {
+ queue.binding.bind(this, queue)
+ queues += queue
+ }
+
+ def remove_queue (queue:Queue) = {
+ queues = queues.filterNot( _ == queue )
+ queue.binding.unbind(this, queue)
+ }
}
@@ -216,13 +322,13 @@ class Router(val host:VirtualHost) exten
*/
trait Route extends Retained {
- def destination:Destination
def dispatchQueue:DispatchQueue
val metric = new AtomicLong();
- def connected(targets:List[DeliveryConsumer]):Unit
def bind(targets:List[DeliveryConsumer]):Unit
def unbind(targets:List[DeliveryConsumer]):Unit
+
+ def connected():Unit
def disconnected():Unit
}
@@ -230,7 +336,7 @@ trait Route extends Retained {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+case class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
override protected def log = Router
override def dispatchQueue = producer.dispatchQueue
@@ -243,8 +349,7 @@ class DeliveryProducerRoute(val router:R
var targets = List[DeliverySession]()
- def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
- internal_bind(targets)
+ def connected() = ^{
on_connected
} >>: dispatchQueue
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Sat Jul 17 00:12:48 2010
@@ -20,15 +20,14 @@ import _root_.java.util.{ArrayList, Hash
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
import _root_.scala.collection.JavaConversions._
-import path.PathFilter
-import org.fusesource.hawtbuf.AsciiBuffer
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import org.apache.activemq.apollo.dto.{VirtualHostDTO}
import java.util.concurrent.TimeUnit
-import org.apache.activemq.apollo.store.{Store, StoreFactory, QueueRecord}
+import org.apache.activemq.apollo.store.{Store, StoreFactory}
import org.apache.activemq.apollo.util._
import ReporterLevel._
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -80,8 +79,6 @@ class VirtualHost(val broker: Broker, va
override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
var config:VirtualHostDTO = _
- val queues = new HashMap[AsciiBuffer, Queue]()
- val durableSubs = new HashMap[String, DurableSubscription]()
val router = new Router(this)
var names:List[String] = Nil;
@@ -159,15 +156,11 @@ class VirtualHost(val broker: Broker, va
// Use a global queue to so we concurrently restore
// the queues.
globalQueue {
- store.getQueueStatus(queueKey) { x =>
+ store.getQueue(queueKey) { x =>
x match {
- case Some(info)=>
-
+ case Some(record)=>
dispatchQueue ^{
- val dest = DestinationParser.parse(info.record.name, destination_parser_options)
- val queue = new Queue(this, dest, queueKey)
- queue.start
- queues.put(dest.getName, queue)
+ router.create_queue(record)
task.run
}
case _ =>
@@ -221,159 +214,53 @@ class VirtualHost(val broker: Broker, va
}
}
- def getQueue(destination:Destination)(cb: (Queue)=>Unit ) = ^{
- if( !serviceState.isStarted ) {
- error("getQueue can only be called while the service is running.")
- cb(null)
- } else {
- var queue = queues.get(destination.getName);
- if( queue==null && config.auto_create_queues ) {
- addQueue(destination)(cb)
- } else {
- cb(queue)
- }
- }
- } |>>: dispatchQueue
-
-
// Try to periodically re-balance connections so that consumers/producers
// are grouped onto the same thread.
def schedualConnectionRegroup:Unit = {
def connectionRegroup = {
- router.each { (destination, node)=>
- node match {
- case x:router.TopicDestinationNode=>
-
- // 1->1 is the easy case...
- if( node.targets.size==1 && node.routes.size==1 ) {
- // move the producer to the consumer thread.
- node.routes.head.producer.collocate( node.targets.head.dispatchQueue )
- } else {
- // we need to get fancy perhaps look at rates
- // to figure out how to be group the connections.
- }
-
- case x:router.QueueDestinationNode=>
-
- if( node.targets.size==1 ) {
- // move the queue to the consumer
- x.queue.collocate( node.targets.head.dispatchQueue )
- } else {
- // we need to get fancy perhaps look at rates
- // to figure out how to be group the connections.
- }
-
- if( node.routes.size==1 ) {
- // move the producer to the queue.
- node.routes.head.producer.collocate( x.queue.dispatchQueue )
- } else {
- // we need to get fancy perhaps look at rates
- // to figure out how to be group the connections.
- }
- }
- }
- schedualConnectionRegroup
- }
- dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
- }
-
-
- def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
- val name = DestinationParser.toBuffer(dest, destination_parser_options)
- if( store!=null ) {
- val record = new QueueRecord
- record.name = name
- record.key = queue_id_counter.incrementAndGet
-
- store.addQueue(record) { rc =>
- rc match {
- case true =>
- dispatchQueue {
- val queue = new Queue(this, dest, record.key)
- queue.start()
- queues.put(dest.getName, queue)
- cb(queue)
- }
- case false => // store could not create
- cb(null)
- }
- }
- } else {
- val queue = new Queue(this, dest, queue_id_counter.incrementAndGet)
- queue.start()
- queues.put(dest.getName, queue)
- cb(queue)
- }
- } |>>: dispatchQueue
-
- def createSubscription(consumer:ConsumerContext):BrokerSubscription = {
- createSubscription(consumer, consumer.getDestination());
- }
-
- def createSubscription(consumer:ConsumerContext, destination:Destination):BrokerSubscription = {
-
- // First handle composite destinations..
- var destinations = destination.getDestinations();
- if (destinations != null) {
- var subs :List[BrokerSubscription] = Nil
- for (childDest <- destinations) {
- subs ::= createSubscription(consumer, childDest);
- }
- return new CompositeSubscription(destination, subs);
- }
-
- // If it's a Topic...
-// if ( destination.getDomain == TOPIC_DOMAIN || destination.getDomain == TEMP_TOPIC_DOMAIN ) {
+ // this should really be much more fancy. It should look at the messaging
+ // rates between producers and consumers, look for natural data flow partitions
+ // and then try to equally divide the load over the available processing
+ // threads/cores.
+// router.destinations.valuesIterator.foreach { node =>
+ // todo
+// if( node.get_queue==null ) {
+// // Looks like a topic destination...
+//
+// // 1->1 is the easy case...
+// if( node.direct_consumers.size==1 && node.producers.size==1 ) {
+// // move the producer to the consumer thread.
+// node.producers.head.producer.collocate( node.direct_consumers.head.dispatchQueue )
+// } else {
+// // we need to get fancy perhaps look at rates
+// // to figure out how to be group the connections.
+// }
+// } else {
+// // Looks like a queue destination...
//
-// // It might be a durable subscription on the topic
-// if (consumer.isDurable()) {
-// var dsub = durableSubs.get(consumer.getSubscriptionName());
-// if (dsub == null) {
-//// TODO:
-//// IQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
-//// queue.start();
-//// dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
-//// durableSubs.put(consumer.getSubscriptionName(), dsub);
-// }
-// return dsub;
+// if( node.direct_consumers.size==1 ) {
+// // move the queue to the consumer
+// node.get_queue.collocate( node.direct_consumers.head.dispatchQueue )
+// } else {
+// // we need to get fancy perhaps look at rates
+// // to figure out how to be group the connections.
// }
//
-// // return a standard subscription
-//// TODO:
-//// return new TopicSubscription(this, destination, consumer.getSelectorExpression());
-// return null;
+// if( node.producers.size==1 ) {
+// // move the producer to the queue.
+// node.producers.head.producer.collocate( node.get_queue.dispatchQueue )
+// } else {
+// // we need to get fancy perhaps look at rates
+// // to figure out how to be group the connections.
+// }
+//
+// }
// }
-
- // It looks like a wild card subscription on a queue..
- if (PathFilter.containsWildCards(destination.getName())) {
- return new WildcardQueueSubscription(this, destination, consumer);
- }
-
- // It has to be a Queue subscription then..
- var queue = queues.get(destination.getName());
- if (queue == null) {
- if (consumer.autoCreateDestination()) {
-// TODO
-// queue = createQueue(destination);
- } else {
- throw new IllegalStateException("The queue does not exist: " + destination.getName());
- }
- }
-// TODO:
-// return new Queue.QueueSubscription(queue);
- return null;
- }
-
-
- val queueLifecyleListeners = new ArrayList[QueueLifecyleListener]();
-
- def addDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= {
- queueLifecyleListeners.add(listener);
+ schedualConnectionRegroup
+ }
+ dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
}
- def removeDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= {
- queueLifecyleListeners.add(listener);
- }
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala Sat Jul 17 00:12:48 2010
@@ -22,7 +22,7 @@ import java.net.{URL, URI}
import org.apache.activemq.apollo.broker._
import org.apache.activemq.apollo.dto._
import java.lang.String
-import XmlEncoderDecoder._
+import XmlCodec._
import org.apache.activemq.apollo.util._
class XmlBrokerFactory extends BrokerFactory.Provider {
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Sat Jul 17 00:12:48 2010
@@ -17,11 +17,11 @@
package org.apache.activemq.apollo.broker.protocol
import java.io.{IOException}
-import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
import org.apache.activemq.apollo.util.ClassFinder
import org.apache.activemq.apollo.store.MessageRecord
import org.apache.activemq.apollo.transport._
+import org.apache.activemq.apollo.broker.{Delivery, Message, BrokerConnection}
/**
* <p>
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Sat Jul 17 00:12:48 2010
@@ -224,7 +224,7 @@ abstract class BrokerPerfSupport extends
var dests = new Array[Destination](destCount)
for (i <- 0 until destCount) {
- val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+ val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
val name = new AsciiBuffer("dest" + (i + 1))
var bean = new SingleDestination(domain, name)
dests(i) = bean
Modified: activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto Sat Jul 17 00:12:48 2010
@@ -32,3 +32,9 @@ message PBQueueEntryRecord {
optional int32 size = 3;
optional int32 redeliveries = 4;
}
+
+message PBQueueRecord {
+ required int64 key=1;
+ optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes binding_data = 3;
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala Sat Jul 17 00:12:48 2010
@@ -94,6 +94,24 @@ class CassandraClient() {
pb.freeze.toUnframedByteArray
}
+ implicit def decodeQueueRecord(v: Array[Byte]): QueueRecord = {
+ import PBQueueRecord._
+ val pb = PBQueueRecord.FACTORY.parseUnframed(v)
+ val rc = new QueueRecord
+ rc.key = pb.getKey
+ rc.binding_kind = pb.getBindingKind
+ rc.binding_data = pb.getBindingData
+ rc
+ }
+
+ implicit def encodeQueueRecord(v: QueueRecord): Array[Byte] = {
+ val pb = new PBQueueRecord.Bean
+ pb.setKey(v.key)
+ pb.setBindingKind(v.binding_kind)
+ pb.setBindingData(v.binding_data)
+ pb.freeze.toUnframedByteArray
+ }
+
def purge() = {
withSession {
session =>
@@ -109,7 +127,7 @@ class CassandraClient() {
def addQueue(record: QueueRecord) = {
withSession {
session =>
- session.insert(schema.queue_name \ (record.key, record.name))
+ session.insert(schema.queue_name \ (record.key, record))
}
}
@@ -133,25 +151,14 @@ class CassandraClient() {
}
}
- def getQueueStatus(id: Long): Option[QueueStatus] = {
+ def getQueue(id: Long): Option[QueueRecord] = {
withSession {
session =>
session.get(schema.queue_name \ id) match {
case Some(x) =>
+ val record:QueueRecord = x.value
+ Some(record)
- val rc = new QueueStatus
- rc.record = new QueueRecord
- rc.record.key = id
- rc.record.name = new AsciiBuffer(x.value)
-
- rc.count = session.count( schema.entries \ id )
-
- // TODO
- // rc.count =
- // rc.first =
- // rc.last =
-
- Some(rc)
case None =>
None
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala Sat Jul 17 00:12:48 2010
@@ -185,9 +185,9 @@ class CassandraStore extends Store with
}
}
- def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
+ def getQueue(id: Long)(callback: (Option[QueueRecord]) => Unit) = {
blocking {
- callback( client.getQueueStatus(id) )
+ callback( client.getQueue(id) )
}
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml Sat Jul 17 00:12:48 2010
@@ -38,6 +38,18 @@
<artifactId>jackson-core-asl</artifactId>
<version>${jackson-version}</version>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson-version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ <version>${hawtbuf-version}</version>
+ <optional>true</optional>
+ </dependency>
<!-- Testing Dependencies -->
<dependency>
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java Sat Jul 17 00:12:48 2010
@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.store;
+package org.apache.activemq.apollo.dto;
-import org.fusesource.hawtbuf.AsciiBuffer;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+import javax.xml.bind.annotation.*;
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueRecord {
-
- public long key = -1;
- public AsciiBuffer name;
- public AsciiBuffer queueType;
-
-// public AsciiBuffer parent;
-
+@XmlType(name = "binding")
+@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class BindingDTO {
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Sat Jul 17 00:12:48 2010
@@ -18,34 +18,34 @@ package org.apache.activemq.apollo.dto;
import org.codehaus.jackson.annotate.JsonProperty;
-import java.util.ArrayList;
-
import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name = "virtual-host")
+@XmlRootElement(name = "destination")
@XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostDTO extends ServiceDTO<String> {
-
- @XmlElement(name="host-name", required=true)
- public ArrayList<String> host_names = new ArrayList<String>();
+public class DestinationDTO {
- @XmlElementRef
- public StoreDTO store;
+ /**
+ * The name or wild card name of the destination
+ */
+ public String name;
/**
- * Should queues be auto created when they are first accessed
- * by clients?
+ * The kind of destination, "queue" or "topic"
*/
- @JsonProperty("auto_create_queues")
- @XmlAttribute(name="auto-create-queues")
- public boolean auto_create_queues = true;
+ public String kind;
/**
- * Should queues be purged on startup?
+ * If set to true, then routing then there is no difference between
+ * sending to a queue or topic of the same name. The first time
+ * a queue subscriptions is created, it will act like if a durable
+ * subscription was created on the topic.
*/
- @XmlAttribute(name="purge-on-startup")
- public boolean purge_on_startup = false;
+ public boolean unified = false;
+
+
+
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java Sat Jul 17 00:12:48 2010
@@ -39,12 +39,6 @@ public class DestinationStatusDTO extend
public String name;
/**
- * The routing domain
- */
- @XmlAttribute
- public String domain;
-
- /**
* Ids of all connections that are producing to the destination
*/
@XmlElement(name="producer")
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java Sat Jul 17 00:12:48 2010
@@ -14,20 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.store;
-
-import org.fusesource.hawtbuf.AsciiBuffer;
+package org.apache.activemq.apollo.dto;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueRecord {
+@XmlRootElement(name = "durable-subscription-binding")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class DurableSubscriptionBindingDTO extends BindingDTO {
+
+ public String destination;
- public long key = -1;
- public AsciiBuffer name;
- public AsciiBuffer queueType;
+ public String filter;
-// public AsciiBuffer parent;
+ @XmlAttribute(name="client-id")
+ public String client_id;
+ @XmlAttribute(name="subscription-id")
+ public String subscription_id;
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java Sat Jul 17 00:12:48 2010
@@ -16,36 +16,34 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import java.util.ArrayList;
-
-import javax.xml.bind.annotation.*;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.stream.XMLStreamException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
/**
+ *
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name = "virtual-host")
-@XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostDTO extends ServiceDTO<String> {
-
- @XmlElement(name="host-name", required=true)
- public ArrayList<String> host_names = new ArrayList<String>();
-
- @XmlElementRef
- public StoreDTO store;
-
- /**
- * Should queues be auto created when they are first accessed
- * by clients?
- */
- @JsonProperty("auto_create_queues")
- @XmlAttribute(name="auto-create-queues")
- public boolean auto_create_queues = true;
-
- /**
- * Should queues be purged on startup?
- */
- @XmlAttribute(name="purge-on-startup")
- public boolean purge_on_startup = false;
+public class JsonCodec {
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ static public <T> T decode(Buffer buffer, Class<T> type) throws IOException {
+ return mapper.readValue(buffer.in(), type);
+ }
+
+ static public Buffer encode(Object value) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ mapper.writeValue(baos, value);
+ return baos.toBuffer();
+ }
+
}
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java Sat Jul 17 00:12:48 2010
@@ -14,20 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.store;
+package org.apache.activemq.apollo.dto;
-import org.fusesource.hawtbuf.AsciiBuffer;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+import javax.xml.bind.annotation.*;
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueRecord {
-
- public long key = -1;
- public AsciiBuffer name;
- public AsciiBuffer queueType;
-
-// public AsciiBuffer parent;
+@XmlRootElement(name = "queue-binding")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PointToPointBindingDTO extends BindingDTO {
+
+ /**
+ * A label that describes the binding
+ */
+ @XmlAttribute
+ public String destination;
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Sat Jul 17 00:12:48 2010
@@ -38,6 +38,9 @@ public class QueueStatusDTO extends Long
@XmlAttribute
public long id;
+ @XmlAttribute
+ public String label;
+
@XmlAttribute(name="enqueue-item-counter")
public long enqueue_item_counter;
Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Sat Jul 17 00:12:48 2010
@@ -48,4 +48,12 @@ public class VirtualHostDTO extends Serv
*/
@XmlAttribute(name="purge-on-startup")
public boolean purge_on_startup = false;
+
+ /**
+ * Holds the configuration for the destinations.
+ */
+ @XmlElement(name="destination")
+ public ArrayList<DestinationDTO> destinations = new ArrayList<DestinationDTO>();
+
+
}
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java Sat Jul 17 00:12:48 2010
@@ -33,7 +33,7 @@ import java.util.regex.Pattern;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class XmlEncoderDecoder {
+public class XmlCodec {
/**
* Changes ${property} with values from a properties object
Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java Sat Jul 17 00:12:48 2010
@@ -27,7 +27,7 @@ import static junit.framework.Assert.*;
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class XmlEncoderDecoderTest {
+public class XmlCodecTest {
private InputStream resource(String path) {
return getClass().getResourceAsStream(path);
@@ -35,7 +35,7 @@ public class XmlEncoderDecoderTest {
@Test
public void unmarshalling() throws Exception {
- BrokerDTO dto = XmlEncoderDecoder.unmarshalBrokerDTO(resource("simple.xml"));
+ BrokerDTO dto = XmlCodec.unmarshalBrokerDTO(resource("simple.xml"));
assertNotNull(dto);
assertEquals("default", dto.id);
assertEquals(true, dto.enabled);
@@ -66,7 +66,7 @@ public class XmlEncoderDecoderTest {
broker.connectors.add(connector);
broker.basedir = "./activemq-data/default";
- XmlEncoderDecoder.marshalBrokerDTO(broker, System.out, true);
+ XmlCodec.marshalBrokerDTO(broker, System.out, true);
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto Sat Jul 17 00:12:48 2010
@@ -69,9 +69,10 @@ message AddMessage {
message AddQueue {
required int64 key=1;
- optional bytes name = 2 [java_override_type = "AsciiBuffer"];
- optional bytes queueType = 3 [java_override_type = "AsciiBuffer"];
+ optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes binding_data = 3;
}
+
message RemoveQueue {
required int64 key=1;
}
@@ -149,9 +150,7 @@ message DatabaseRootRecord {
message QueueRootRecord {
optional AddQueue info=1;
- optional int64 size=2;
- optional int64 count=3;
- optional fixed32 entryIndexPage=4;
- optional fixed32 trackingIndexPage=5;
+ optional fixed32 entryIndexPage=2;
+ optional fixed32 trackingIndexPage=3;
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala Sat Jul 17 00:12:48 2010
@@ -224,8 +224,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def addQueue(record: QueueRecord, callback:Runnable) = {
val update = new AddQueue.Bean()
update.setKey(record.key)
- update.setName(record.name)
- update.setQueueType(record.queueType)
+ update.setBindingKind(record.binding_kind)
+ update.setBindingData(record.binding_data)
_store(update, callback)
}
@@ -280,26 +280,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
rc
}
- def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
+ def getQueue(queueKey: Long): Option[QueueRecord] = {
withTx { tx =>
val helper = new TxHelper(tx)
import helper._
val queueRecord = queueIndex.get(queueKey)
if (queueRecord != null) {
- val rc = new QueueStatus
- rc.record = new QueueRecord
- rc.record.key = queueKey
- rc.record.name = queueRecord.getInfo.getName
- rc.record.queueType = queueRecord.getInfo.getQueueType
- rc.count = queueRecord.getCount.toInt
- rc.size = queueRecord.getSize
-
- // TODO
- // rc.first =
- // rc.last =
-
- Some(rc)
+ val record = new QueueRecord
+ record.key = queueKey
+ record.binding_kind = queueRecord.getInfo.getBindingKind
+ record.binding_data = queueRecord.getInfo.getBindingData
+ Some(record)
} else {
None
}
@@ -815,12 +807,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
if (existing == null) {
val previous = entryIndex.put(queueSeq, x.freeze)
if (previous == null) {
-
- val queueRecordUpdate = queueRecord.copy
- queueRecordUpdate.setCount(queueRecord.getCount + 1)
- queueRecordUpdate.setSize(queueRecord.getSize + x.getSize)
- queueIndex.put(queueKey, queueRecordUpdate.freeze)
-
addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
} else {
// TODO perhaps treat this like an update?
Modified: activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala Sat Jul 17 00:12:48 2010
@@ -160,9 +160,9 @@ class HawtDBStore extends Store with Bas
}
}
- def getQueueStatus(queueKey: Long)(callback: (Option[QueueStatus]) => Unit) = {
+ def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
executor_pool {
- callback( client.getQueueStatus(queueKey) )
+ callback( client.getQueue(queueKey) )
}
}