You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/14 20:47:40 UTC
svn commit: r1373022 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/proto/ apollo-broker/src/main/res...
Author: chirino
Date: Tue Aug 14 18:47:38 2012
New Revision: 1373022
URL: http://svn.apache.org/viewvc?rev=1373022&view=rev
Log:
Decoupled protocols from message codecs. Simplified creating a protocol. Should be simpler to customize UDP protocol handling. Added support for auth and virtual host host selection to UDP protocols.
Added:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
- copied, changed from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
- copied, changed from r1373021, activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
- copied, changed from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
Removed:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index&r1=1373021&r2=1373022&rev=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.amqp.AmqpMessageCodecFactory
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.amqp.AmqpProtocolCodecFactory
\ No newline at end of file
+org.apache.activemq.apollo.amqp.AmqpProtocolCodec
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala Tue Aug 14 18:47:38 2012
@@ -36,7 +36,7 @@ object AmqpCodec extends Log {
message match {
case message:AMQPMessage =>
val rc = new MessageRecord
- rc.protocol = PROTOCOL_ID
+ rc.codec = PROTOCOL_ID
rc.buffer = message.payload
rc
case _ => throw new RuntimeException("Invalid message type");
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Tue Aug 14 18:47:38 2012
@@ -18,8 +18,7 @@ package org.apache.activemq.apollo.amqp
import _root_.org.fusesource.hawtbuf._
import org.apache.activemq.apollo.broker._
-import java.lang.String
-import protocol.{ProtocolCodecFactory, ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
import org.apache.activemq.apollo.broker.store._
import AmqpCodec._
import org.fusesource.amqp.codec.AMQPProtocolCodec
@@ -51,27 +50,24 @@ class AmqpProtocolCodecFactory extends P
}
}
-class AmqpProtocolFactory extends ProtocolFactory {
-
- def create() = AmqpProtocol
-
- def create(config: String) = if(config == PROTOCOL) {
- AmqpProtocol
- } else {
- null
- }
-
-}
-
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object AmqpProtocol extends AmqpProtocolCodecFactory with Protocol {
-
def createProtocolHandler = new AmqpProtocolHandler
+}
+
+object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
+ def create = Array[MessageCodec](AmqpMessageCodec)
+}
+
+ /**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object AmqpMessageCodec extends MessageCodec {
+ def id = PROTOCOL
def encode(message: Message) = AmqpCodec.encode(message)
def decode(message: MessageRecord) = AmqpCodec.decode(message)
-
}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Aug 14 18:47:38 2012
@@ -54,7 +54,7 @@ object AMQPMessage {
case class AMQPMessage(payload:Buffer) extends org.apache.activemq.apollo.broker.Message {
import AmqpProtocolHandler._
- def protocol = AmqpProtocol
+ def codec = AmqpMessageCodec
var _annotated:Envelope = _
def annotated = {
@@ -518,7 +518,7 @@ class AmqpProtocolHandler extends Protoc
// frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
// }
- var annotated = if( message.protocol eq AmqpProtocol ) {
+ var annotated = if( message.codec eq AmqpMessageCodec ) {
val original = message.asInstanceOf[AMQPMessage].annotated
var annotated = new Envelope
annotated.setHeader(header)
@@ -530,8 +530,8 @@ class AmqpProtocolHandler extends Protoc
} else {
val (body, content_type) = protocol_convert match{
- case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.protocol.id()+";conv=body")
- case _ => (message.encoded, "protocol/"+message.protocol.id())
+ case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
+ case _ => (message.encoded, "protocol/"+message.codec.id)
}
val bare = new types.Message
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Tue Aug 14 18:47:38 2012
@@ -24,7 +24,7 @@ option java_multiple_files = true;
message MessagePB {
required int64 messageKey=1;
- required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
+ required bytes codec = 2 [java_override_type = "AsciiBuffer"];
optional int32 size = 3;
optional bytes value = 4;
optional sint64 expiration = 5;
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.RawMessageCodec
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.RawMessageCodecFactory
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:38 2012
@@ -14,6 +14,5 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.AnyProtocolFactory
-org.apache.activemq.apollo.broker.protocol.UdpProtocolFactory
-org.apache.activemq.apollo.broker.protocol.RawProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.AnyProtocol
+org.apache.activemq.apollo.broker.protocol.UdpProtocol
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue Aug 14 18:47:38 2012
@@ -272,10 +272,18 @@ class Broker() extends BaseService with
rc
})
+ @volatile
var default_virtual_host: VirtualHost = null
val virtual_hosts = LinkedHashMap[AsciiBuffer, VirtualHost]()
val virtual_hosts_by_hostname = new LinkedHashMap[AsciiBuffer, VirtualHost]()
+ /**
+ * This is a copy of the virtual_hosts_by_hostname variable which
+ * can be accessed by any thread.
+ */
+ @volatile
+ var cow_virtual_hosts_by_hostname = Map[AsciiBuffer, VirtualHost]()
+
val connectors = LinkedHashMap[String, Connector]()
val connections = LinkedHashMap[Long, BrokerConnection]()
@@ -389,6 +397,7 @@ class Broker() extends BaseService with
)
virtual_hosts.clear()
virtual_hosts_by_hostname.clear()
+ cow_virtual_hosts_by_hostname = Map()
Option(web_server).foreach(tracker.stop(_))
web_server = null
@@ -541,6 +550,8 @@ class Broker() extends BaseService with
}
}
+ cow_virtual_hosts_by_hostname = virtual_hosts_by_hostname.toMap
+
// first defined host is the default virtual host
config.virtual_hosts.headOption.map(x=>ascii(x.id)).foreach { id =>
default_virtual_host = virtual_hosts.get(id).getOrElse(null)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Tue Aug 14 18:47:38 2012
@@ -17,7 +17,7 @@
package org.apache.activemq.apollo.broker
import org.fusesource.hawtdispatch._
-import protocol.{ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
import org.fusesource.hawtdispatch.transport._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.OptionSupport._
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue Aug 14 18:47:38 2012
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util.Log
import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
import org.apache.activemq.apollo.dto.DestinationDTO
-import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodec, Protocol, ProtocolFactory}
import scala.Array
object DeliveryProducer extends Log
@@ -99,11 +99,11 @@ trait DeliverySession extends SessionSin
trait Message extends Filterable with Retained {
/**
- * The protocol of the message
+ * The encoder/decoder of the message
*/
- def protocol:Protocol
+ def codec:MessageCodec
- def encoded:Buffer = protocol.encode(this).buffer
+ def encoded:Buffer = codec.encode(this).buffer
}
@@ -243,7 +243,7 @@ class Delivery {
}
def createMessageRecord() = {
- val record = message.protocol.encode(message)
+ val record = message.codec.encode(message)
record.locator = storeLocator
record
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Tue Aug 14 18:47:38 2012
@@ -17,7 +17,7 @@
package org.apache.activemq.apollo.broker
import org.fusesource.hawtdispatch._
-import protocol.ProtocolFactory
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, ProtocolFactory, Protocol}
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
@@ -732,7 +732,7 @@ class QueueEntry(val queue:Queue, val se
queue.swapping_in_size -= size
val delivery = to_delivery
- delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
+ delivery.message = MessageCodecFactory(messageRecord.codec.toString).get.decode(messageRecord)
space += delivery
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Aug 14 18:47:38 2012
@@ -70,7 +70,12 @@ abstract class Sink[T] {
}
}
}
+}
+case class BlackHoleSink[T]() extends Sink[T] {
+ var refiller:Task = null
+ def full = false
+ def offer(value: T) = true
}
trait SinkFilter[T] {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Tue Aug 14 18:47:38 2012
@@ -35,61 +35,20 @@ import transport.{Transport, TransportAw
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class AnyProtocolFactory extends ProtocolFactory {
-
- def all_protocols: Array[Protocol] = ((ProtocolFactory.finder.singletons.map(_.create())).filter(_.isIdentifiable)).toArray
-
- def create() = {
- new AnyProtocol(()=>all_protocols)
- }
-
- def create(config: String): Protocol = {
- val MULTI = "any"
- val MULTI_PREFIXED = "any:"
-
- if (config == MULTI) {
- return new AnyProtocol(()=>all_protocols)
- } else if (config.startsWith(MULTI_PREFIXED)) {
- var names: Array[String] = config.substring(MULTI_PREFIXED.length).split(',')
- var protocols: Array[Protocol] = (names.flatMap {x => ProtocolFactory.get(x.trim)}).toArray
- return new AnyProtocol(()=>protocols)
- }
- return null
- }
-
-}
-
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class AnyProtocol(val func: ()=>Array[Protocol]) extends Protocol {
-
- lazy val protocols: Array[Protocol] = func()
+class AnyProtocol() extends BaseProtocol {
def id = "any"
- def createProtocolCodec = new AnyProtocolCodec(protocols)
+ def createProtocolCodec = new AnyProtocolCodec()
def createProtocolHandler = new AnyProtocolHandler
-
- def encode(message: Message) = throw new UnsupportedOperationException
-
- def decode(message: MessageRecord) = throw new UnsupportedOperationException
-
- def isIdentifiable = false
-
- def maxIdentificaionLength = throw new UnsupportedOperationException()
-
- def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
-
}
case class ProtocolDetected(id:String, codec:ProtocolCodec)
-class AnyProtocolCodec(val protocols: Array[Protocol]) extends ProtocolCodec with TransportAware {
+class AnyProtocolCodec() extends ProtocolCodec with TransportAware {
+
+ var protocols = ProtocolFactory.protocols.filter(_.isIdentifiable)
if (protocols.isEmpty) {
throw new IllegalArgumentException("No protocol configured for identification.")
@@ -190,13 +149,20 @@ class AnyProtocolHandler extends Protoco
connection.transport.resumeRead
import OptionSupport._
import collection.JavaConversions._
-
+
+ var codec = connection.transport.getProtocolCodec().asInstanceOf[AnyProtocolCodec]
+
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.flatMap{ _ match {
case x:DetectDTO => Some(x)
case _ => None
}}.headOption.getOrElse(new DetectDTO)
-
+
+ if( config.protocols!=null ) {
+ val protocols = Set(config.protocols.split("\\s+").filter( _.length!=0 ):_*)
+ codec.protocols = codec.protocols.filter(x=> protocols.contains(x.id))
+ }
+
val timeout = config.timeout.getOrElse(5000L)
// Make sure client connects eventually...
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Tue Aug 14 18:47:38 2012
@@ -22,12 +22,17 @@ import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.util.{Log, ClassFinder}
import org.apache.activemq.apollo.broker.{Broker, Message, BrokerConnection}
import org.apache.activemq.apollo.dto.{SimpleProtocolFilterDTO, ProtocolFilterDTO, ConnectionStatusDTO}
+import org.fusesource.hawtbuf.Buffer
+import scala.collection.mutable.ListBuffer
-trait Protocol {
- def id:String
+trait Protocol extends ProtocolCodecFactory.Provider {
def createProtocolHandler:ProtocolHandler
- def encode(message:Message):MessageRecord
- def decode(message:MessageRecord):Message
+}
+
+abstract class BaseProtocol extends Protocol {
+ def isIdentifiable = false
+ def maxIdentificaionLength = throw new UnsupportedOperationException()
+ def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
}
/**
@@ -36,15 +41,37 @@ trait Protocol {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object Protocol {
+object ProtocolFactory {
- val finder = new ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocol-factory.index",classOf[Protocol])
+ private def finder = new ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocol-factory.index", classOf[Protocol])
+ val protocols:Array[Protocol] = finder.singletons.toArray
+ val protocols_by_id = Map(protocols.map(x=> (x.id, x)): _*)
+ def get(name:String):Option[Protocol] = protocols_by_id.get(name)
- val protocols = Map((for( provider <- finder.singletons ) {
- yield (provider.id, provider)
- }): _*)
+}
- def get(name:String):Option[Protocol] = protocols.get(name)
+trait MessageCodec {
+ def id():String
+ def encode(message:Message):MessageRecord
+ def decode(message:MessageRecord):Message
+}
+
+object MessageCodecFactory {
+
+ trait Provider {
+ def create:Array[MessageCodec]
+ }
+
+ val codecs:Array[MessageCodec] = {
+ val finder = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/message-codec-factory.index", classOf[Provider])
+ val rc = ListBuffer[MessageCodec]()
+ for( provider <- finder.singletons; codec <- provider.create ) {
+ rc += codec
+ }
+ rc.toArray
+ }
+ val codecs_by_id = Map(codecs.map(x=> (x.id, x)): _*)
+ def apply(id:String) = codecs_by_id.get(id)
}
object ProtocolHandler extends Log
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala Tue Aug 14 18:47:38 2012
@@ -24,6 +24,10 @@ import org.apache.activemq.apollo.broker
import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
import org.fusesource.hawtdispatch.transport.ProtocolCodec
+object RawMessageCodecFactory extends MessageCodecFactory.Provider {
+ def create = Array[MessageCodec](RawMessageCodec)
+}
+
/**
* <p>
* </p>
@@ -35,7 +39,7 @@ object RawMessageCodec extends MessageCo
val PROTOCOL_ID = new AsciiBuffer(id)
def id = "raw"
- override def encode(message: Message):MessageRecord = {
+ def encode(message: Message):MessageRecord = {
message match {
case message:RawMessage =>
val rc = new MessageRecord
@@ -46,13 +50,10 @@ object RawMessageCodec extends MessageCo
}
}
- override def decode(message: MessageRecord) = {
+ def decode(message: MessageRecord) = {
assert( message.codec == PROTOCOL_ID )
RawMessage(message.buffer)
}
-
- def createProtocolHandler = throw new UnsupportedOperationException
- def createProtocolCodec = throw new UnsupportedOperationException
}
case class RawMessage(payload:Buffer) extends Message {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Tue Aug 14 18:47:38 2012
@@ -79,86 +79,156 @@ class UdpProtocolCodec extends ProtocolC
object UdpProtocolHandler extends Log
-class UdpMessage {
+trait DecodedUdpMessage {
+ /**
+ * @return The virtual host the message should get routed to.
+ * return null to route to the default virtual host
+ */
+ def host:VirtualHost
+
+ /**
+ * @return The destination name to route the message to.
+ */
+ def address:AsciiBuffer
+
+ /**
+ * @return The delivery to route.
+ */
+ def delivery:Delivery
+
+ /**
+ * @return The original UdpMesasge that the DecodedUdpMessage was
+ * constructed from.
+ */
+ def message: UdpMessage
+
+ /**
+ * @return The SecurityContext to authenticate and authorize against. Return
+ * null if you want to bypass authentication and authorization.
+ */
+ def security_context:SecurityContext
}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class UdpProtocolHandler extends ProtocolHandler {
+abstract class UdpProtocolHandler extends ProtocolHandler {
import UdpProtocolHandler._
def protocol = "udp"
def session_id = None
- var buffer_size = 0
- var host:VirtualHost = _
+ var buffer_size = 64*1024
var connection_log:Log = _
var config:UdpDTO = _
+ var messages_received = 0L
+ var waiting_on = "client request"
def broker = connection.connector.broker
def queue = connection.dispatch_queue
+ override def create_connection_status = {
+ var rc = super.create_connection_status
+ rc.waiting_on = waiting_on
+ rc.messages_received = messages_received
+ rc
+ }
+
+ def configure(config:UdpDTO) = {
+ this.config = config
+ buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+ }
+
override def on_transport_connected = {
connection.transport.resumeRead
import collection.JavaConversions._
- config = (connection.connector.config match {
+ configure((connection.connector.config match {
case connector_config:AcceptingConnectorDTO =>
connector_config.protocols.flatMap{ _ match {
case x:UdpDTO => Some(x)
case _ => None
}}.headOption
case _ => None
- }).getOrElse(new UdpDTO)
+ }).getOrElse(new UdpDTO) )
+ }
- buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
- decoder.init(this)
+ def suspend_read(reason: String) = {
+ waiting_on = reason
+ connection.transport.suspendRead
+ }
- broker.dispatch_queue {
- var host = broker.get_default_virtual_host
- queue {
- this.host = host
- connection_log = this.host.connection_log
- connection.transport.resumeRead()
- if(host==null) {
- warn("Could not find default virtual host")
- connection.stop(NOOP)
- }
- }
- }
-
+ def resume_read() = {
+ waiting_on = "client request"
+ connection.transport.resumeRead
}
- var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](1000) {
- override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
- host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
+
+ var producerRoutes = new LRUCache[(VirtualHost, AsciiBuffer, SecurityContext#Key), UdpProducerRoute](1000) {
+ override def onCacheEviction(eldest: Entry[(VirtualHost, AsciiBuffer, SecurityContext#Key), UdpProducerRoute]) = {
+ val (host, address, key) = eldest.getKey
+ val route = eldest.getValue
+ host.dispatch_queue {
+ host.router.disconnect(route.addresses, route)
+ }
}
}
override def on_transport_command(command: AnyRef) = {
- val msg = command.asInstanceOf[UdpMessage]
- val address = decoder.address(msg)
- var route = producerRoutes.get(address);
- if( route == null ) {
- route = new StompProducerRoute(address)
- producerRoutes.put(address, route)
- val security_context = new SecurityContext
- security_context.connector_id = connection.connector.id
- security_context.local_address = connection.transport.getLocalAddress
- host.dispatch_queue {
- val rc = host.router.connect(route.addresses, route, security_context)
- if( rc.isDefined ) {
-
+ decode(command.asInstanceOf[UdpMessage]) match {
+ case Some(msg) =>
+ messages_received += 1
+ val address = msg.address
+ var host = msg.host
+ if( host == null ) {
+ host = broker.default_virtual_host
}
- }
+ val security_context = msg.security_context
+ var sc_key = if( security_context!=null) security_context.to_key else null
+ var route = producerRoutes.get((host, address, sc_key));
+ if( route == null ) {
+ route = new UdpProducerRoute(host, address)
+ producerRoutes.put((host, address, sc_key), route)
+
+ def fail_connect = {
+ // Just drop messages..
+ route.sink_switch.downstream = Some(BlackHoleSink())
+ }
+
+ def continue_connect = host.dispatch_queue {
+ host.router.connect(route.addresses, route, security_context) match {
+ case Some(error) => queue {
+ fail_connect
+ }
+ case None =>
+ }
+ }
+
+ if( security_context!=null && host.authenticator!=null && host.authorizer!=null ) {
+ suspend_read("authenticating")
+ host.authenticator.authenticate(security_context) { auth_failure=>
+ queue {
+ resume_read
+ auth_failure match {
+ case null=> continue_connect
+ case auth_failure=> fail_connect
+ }
+ }
+ }
+ } else {
+ continue_connect
+ }
+ }
+
+ route.send(msg)
+
+ case None =>
}
- route.send(msg);
}
- class StompProducerRoute(dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
- val addresses = decoder.decode_addresses(dest)
+ class UdpProducerRoute(host:VirtualHost, dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
+ val addresses = decode_address(dest.toString)
val key = addresses.toList
override def send_buffer_size = buffer_size
@@ -179,20 +249,20 @@ class UdpProtocolHandler extends Protoco
sink_switch.downstream = Some(this)
}
- def send(frame:UdpMessage) = {
+ def send(frame:DecodedUdpMessage) = {
// Drop older entries to make room for this new one..
while( inbound_queue_size >= buffer_size ) {
inbound_queue.removeFirst
}
- val delivery = decode_delivery(frame)
+ val delivery = frame.delivery
inbound_queue_size += delivery.size
inbound_queue.offer(delivery)
}
}
-
- abstract def decode_delivery(message: UdpMessage):Delivery
+ def decode(message: UdpMessage):Option[DecodedUdpMessage]
+ def decode_address(address:String):Array[SimpleAddress]
}
/**
@@ -202,29 +272,40 @@ class UdpProtocolHandler extends Protoco
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class UdpProtocol extends Protocol {
+class UdpProtocol extends BaseProtocol {
def id = "udp"
def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
+
def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
+ var default_host:VirtualHost = _
var topic_address:AsciiBuffer = _
- var topic_address_decoded:Array[SimpleAddress] = _
- def init(handler:UdpProtocolHandler) = {
- val topic_name = Option(handler.config.topic).getOrElse("udp")
- topic_address_decoded = LocalRouter.destination_parser.decode_multi_destination(topic_name, (name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
+ override def configure(config: UdpDTO) {
+ super.configure(config)
+ val topic_address_decoded = decode_address(Option(config.topic).getOrElse("udp"))
topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
+ default_host = broker.default_virtual_host
}
- def address(message: UdpMessage) = topic_address
- def decode_addresses(value: AsciiBuffer) = topic_address_decoded
- def decode_delivery(message: UdpMessage) = {
- val delivery = new Delivery
- delivery.size = message.buffer.remaining()
- delivery.message = RawMessage(new Buffer(message.buffer))
- delivery
+ def decode_address(address:String):Array[SimpleAddress] = {
+ LocalRouter.destination_parser.decode_multi_destination(address, (name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
}
+ def decode(udp: UdpMessage) = Some(new DecodedUdpMessage {
+ def message = udp
+ def host = null
+ def security_context = null
+ def address = topic_address
+
+ def delivery = {
+ val delivery = new Delivery
+ delivery.size = message.buffer.remaining()
+ delivery.message = RawMessage(new Buffer(message.buffer))
+ delivery
+ }
+ })
+
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala Tue Aug 14 18:47:38 2012
@@ -23,6 +23,7 @@ import java.net.SocketAddress
import org.apache.activemq.apollo.broker.Broker.BLOCKABLE_THREAD_POOL
import org.fusesource.hawtdispatch._
import javax.security.auth.login.LoginContext
+import scala.collection.mutable.ListBuffer
/**
* <p>
@@ -42,6 +43,17 @@ class SecurityContext {
var login_context:LoginContext = _
var session_id:Option[String] = None
+ case class Key(user:String,
+ password:String,
+ sso_token:String,
+ certificates:ListBuffer[X509Certificate],
+ connector_id:String,
+ local_address:SocketAddress,
+ remote_address:SocketAddress)
+
+ def to_key = Key(user, password, sso_token, if(certificates==null) ListBuffer() else ListBuffer(certificates : _*),
+ connector_id, local_address, remote_address)
+
def credential_dump = {
var rc = List[String]()
if(certificates!=null) {
@@ -66,7 +78,6 @@ class SecurityContext {
}
private var _subject:Subject = _
-
def subject = _subject
private var _principals = Set[Principal]()
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Tue Aug 14 18:47:38 2012
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.Atomi
class MessageRecord {
var key = -1L
- var protocol: AsciiBuffer = _
+ var codec: AsciiBuffer = _
var buffer: Buffer = _
var compressed: Buffer = _
var direct_buffer: DirectBuffer = _
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Tue Aug 14 18:47:38 2012
@@ -30,7 +30,7 @@ object PBSupport {
implicit def to_pb(v: MessageRecord):MessagePB.Bean = {
val pb = new MessagePB.Bean
pb.setMessageKey(v.key)
- pb.setProtocol(v.protocol)
+ pb.setCodec(v.codec)
pb.setValue(v.buffer)
pb
}
@@ -38,7 +38,7 @@ object PBSupport {
implicit def from_pb(pb: MessagePB.Getter):MessageRecord = {
val rc = new MessageRecord
rc.key = pb.getMessageKey
- rc.protocol = pb.getProtocol
+ rc.codec = pb.getCodec
rc.buffer = pb.getValue
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Tue Aug 14 18:47:38 2012
@@ -92,7 +92,7 @@ abstract class StoreFunSuiteSupport exte
def add_message(batch:StoreUOW, content:String) = {
var message = new MessageRecord
- message.protocol = ascii("test-protocol")
+ message.codec = ascii("test-protocol")
message.buffer = ascii(content).buffer
message.locator = new AtomicReference[Object]()
val key = batch.store(message)
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java Tue Aug 14 18:47:38 2012
@@ -39,6 +39,9 @@ public class DetectDTO extends ProtocolD
@XmlAttribute(name="timeout")
public Long timeout;
+ @XmlAttribute(name="protocols")
+ public String protocols;
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java Tue Aug 14 18:47:38 2012
@@ -33,13 +33,6 @@ import javax.xml.bind.annotation.XmlRoot
@JsonIgnoreProperties(ignoreUnknown = true)
public class UdpDTO extends ProtocolDTO {
- /**
- * Class name of the decoder that will be used to interpret the
- * UDP message
- */
- @XmlAttribute(name="decoder")
- public String decoder;
-
@XmlAttribute(name="buffer_size")
public String buffer_size;
@@ -48,13 +41,24 @@ public class UdpDTO extends ProtocolDTO
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- UdpDTO detectDTO = (UdpDTO) o;
-
- if (decoder != null ? !decoder.equals(detectDTO.decoder) : detectDTO.decoder != null) return false;
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UdpDTO)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ UdpDTO udpDTO = (UdpDTO) o;
+
+ if (buffer_size != null ? !buffer_size.equals(udpDTO.buffer_size) : udpDTO.buffer_size != null) {
+ return false;
+ }
+ if (topic != null ? !topic.equals(udpDTO.topic) : udpDTO.topic != null) {
+ return false;
+ }
return true;
}
@@ -62,7 +66,8 @@ public class UdpDTO extends ProtocolDTO
@Override
public int hashCode() {
int result = super.hashCode();
- result = 31 * result + (decoder != null ? decoder.hashCode() : 0);
+ result = 31 * result + (buffer_size != null ? buffer_size.hashCode() : 0);
+ result = 31 * result + (topic != null ? topic.hashCode() : 0);
return result;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Tue Aug 14 18:47:38 2012
@@ -842,7 +842,7 @@ class LevelDBClient(store: LevelDBStore)
if (message_record != null) {
val pb = new MessagePB.Bean
- pb.setProtocol(message_record.protocol)
+ pb.setCodec(message_record.codec)
val body = if(message_record.compressed!=null) {
pb.setCompression(1)
@@ -1344,7 +1344,7 @@ class LevelDBClient(store: LevelDBStore)
case record: MessagePB.Buffer =>
val pb = new MessagePB.Bean
- pb.setProtocol(record.getProtocol)
+ pb.setCodec(record.getCodec)
val body = if(snappy_compress_logs) {
val compressed = Snappy.compress(record.getValue)
if (compressed.length < record.getValue.length) {
Copied: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (from r1373021, activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?p2=activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&r1=1373021&r2=1373022&rev=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.RawMessageCodec
\ No newline at end of file
+org.apache.activemq.apollo.openwire.OpenwireMessageCodecFactory
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.openwire.OpenwireProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.openwire.OpenwireProtocol
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala Tue Aug 14 18:47:38 2012
@@ -41,7 +41,7 @@ object OpenwireCodec extends Sizer[Comma
def encode(message: Message):MessageRecord = {
val rc = new MessageRecord
- rc.protocol = PROTOCOL
+ rc.codec = PROTOCOL
val msg = message.asInstanceOf[OpenwireMessage];
rc.buffer = msg.message.getCachedEncoding match {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Tue Aug 14 18:47:38 2012
@@ -39,7 +39,7 @@ class OpenwireMessage(val message:Active
def getLocalConnectionId = message.getProducerId.getConnectionId
- def protocol = OpenwireProtocol
+ def codec = OpenwireMessageCodec
def getBodyAs[T](toType : Class[T]) = {
(message match {
@@ -78,7 +78,7 @@ object EndOfBrowseMessage extends Messag
def retained(): Int = 0
def retain() {}
def release() {}
- def protocol: Protocol = null
+ def codec = null
def priority: Byte = 0
def persistent: Boolean = false
def expiration: Long = 0L
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala Tue Aug 14 18:47:38 2012
@@ -20,7 +20,7 @@ package org.apache.activemq.apollo.openw
import org.apache.activemq.apollo.broker.store.MessageRecord
import org.apache.activemq.apollo.broker.Message
import OpenwireConstants._
-import org.apache.activemq.apollo.broker.protocol.{ProtocolCodecFactory, Protocol, ProtocolFactory}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
import org.fusesource.hawtbuf.Buffer
import org.apache.activemq.apollo.util.Log
@@ -30,40 +30,31 @@ import org.apache.activemq.apollo.util.L
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object OpenwireProtocolFactory extends ProtocolFactory {
+object OpenwireProtocol extends OpenwireProtocolCodecFactory with Protocol with Log {
- def create() = OpenwireProtocol
+ def createProtocolHandler = new OpenwireProtocolHandler
- def create(config: String) = if(config == PROTOCOL) {
- OpenwireProtocol
- } else {
+ lazy val log_exerimental_warning = {
+ warn("The OpenWire protocol implementation is still experimental and not recommended for production use. Production users should use ActiveMQ instead.")
null
}
+}
+object OpenwireMessageCodecFactory extends MessageCodecFactory.Provider {
+ def create = Array[MessageCodec](OpenwireMessageCodec)
}
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object OpenwireProtocol extends OpenwireProtocolCodecFactory with Protocol with Log {
+object OpenwireMessageCodec extends MessageCodec {
- def createProtocolHandler = new OpenwireProtocolHandler
+ def id = "openwire"
def encode(message: Message):MessageRecord = {
OpenwireCodec.encode(message)
}
-
def decode(message: MessageRecord) = {
OpenwireCodec.decode(message)
}
- lazy val log_exerimental_warning = {
- warn("The OpenWire protocol implementation is still experimental and not recommended for production use. Production users should use ActiveMQ instead.")
- null
- }
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Aug 14 18:47:38 2012
@@ -935,7 +935,7 @@ class OpenwireProtocolHandler extends Pr
override def receive_buffer_size = buffer_size
def matches(delivery:Delivery) = {
- if( delivery.message.protocol eq OpenwireProtocol ) {
+ if( delivery.message.codec eq OpenwireMessageCodec ) {
if( selector_expression!=null ) {
selector_expression.matches(delivery.message)
} else {
Copied: activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?p2=activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index&r1=1373021&r2=1373022&rev=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompMessageCodecFactory
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Aug 14 18:47:38 2012
@@ -40,7 +40,7 @@ object StompCodec extends Log {
val frame = message.frame
val rc = new MessageRecord
- rc.protocol = PROTOCOL
+ rc.codec = PROTOCOL
if( frame.content.isInstanceOf[ZeroCopyContent] ) {
rc.direct_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Tue Aug 14 18:47:38 2012
@@ -37,7 +37,7 @@ import Stomp._
*/
case class StompFrameMessage(frame:StompFrame) extends Message {
- def protocol = StompProtocol
+ def codec = StompMessageCodec
/**
* the globally unique id of the message
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Tue Aug 14 18:47:38 2012
@@ -18,10 +18,8 @@ package org.apache.activemq.apollo.stomp
import _root_.org.fusesource.hawtbuf._
import org.apache.activemq.apollo.broker._
-import java.lang.String
-import protocol.{ProtocolCodecFactory, ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
import Stomp._
-import org.fusesource.hawtdispatch.transport._
import org.apache.activemq.apollo.broker.store._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -51,24 +49,23 @@ class StompProtocolCodecFactory extends
}
}
-class StompProtocolFactory extends ProtocolFactory {
-
- def create() = StompProtocol
-
- def create(config: String) = if(config == "stomp") {
- StompProtocol
- } else {
- null
- }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StompProtocol extends StompProtocolCodecFactory with Protocol {
+ def createProtocolHandler = new StompProtocolHandler
+}
+object StompMessageCodecFactory extends MessageCodecFactory.Provider {
+ def create = Array[MessageCodec](StompMessageCodec)
}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object StompProtocol extends StompProtocolCodecFactory with Protocol {
+object StompMessageCodec extends MessageCodec{
- def createProtocolHandler = new StompProtocolHandler
+ def id = "stomp"
def encode(message: Message):MessageRecord = {
StompCodec.encode(message.asInstanceOf[StompFrameMessage])
@@ -81,3 +78,5 @@ object StompProtocol extends StompProtoc
}
+
+
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Aug 14 18:47:38 2012
@@ -422,12 +422,12 @@ class StompProtocolHandler extends Proto
val (_, delivery) = event
val message = delivery.message
- var frame = if( message.protocol eq StompProtocol ) {
+ var frame = if( message.codec eq StompMessageCodec ) {
message.asInstanceOf[StompFrameMessage].frame
} else {
val (body, content_type) = protocol_convert match{
- case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.protocol.id()+";conv=body")
- case _ => (message.encoded, "protocol/"+message.protocol.id())
+ case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
+ case _ => (message.encoded, "protocol/"+message.codec.id())
}
message_id_counter += 1
var headers = (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil