You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/03 22:21:19 UTC
svn commit: r1464191 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protoc...
Author: chirino
Date: Wed Apr 3 20:21:18 2013
New Revision: 1464191
URL: http://svn.apache.org/r1464191
Log:
Removing Option from some interfaces to further reduce dependency on scala.
Removed:
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Wed Apr 3 20:21:18 2013
@@ -329,7 +329,7 @@ class AmqpProtocolHandler extends Protoc
def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
- security_context.remote_application = Some(conn.getRemoteContainer())
+ security_context.remote_application = conn.getRemoteContainer()
suspend_read("host lookup")
broker.dispatch_queue {
@@ -350,7 +350,7 @@ class AmqpProtocolHandler extends Protoc
connection_log = virtual_host.connection_log
host = virtual_host
proton.setLocalContainerId(virtual_host.id)
- security_context.session_id = Some("%s-%x".format(host.config.id, host.session_counter.incrementAndGet))
+ security_context.session_id = "%s-%x".format(host.config.id, host.session_counter.incrementAndGet)
// proton.open()
// callback.onSuccess(response)
if (virtual_host.authenticator != null && virtual_host.authorizer != null) {
@@ -693,7 +693,7 @@ class AmqpProtocolHandler extends Protoc
dest =>
if (dest.domain.startsWith("temp-")) {
temp_destination_map.getOrElseUpdate(dest, {
- val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: dest.path.parts
SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
})
} else {
@@ -706,7 +706,7 @@ class AmqpProtocolHandler extends Protoc
var dynamic = target.getDynamic()
if (dynamic) {
temp_dest_counter += 1
- val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: LiteralPart(temp_dest_counter.toString) :: Nil
val rc = SimpleAddress("queue", Path(parts))
val actual = new Target();
var address = destination_parser.encode_destination(rc)
@@ -728,7 +728,7 @@ class AmqpProtocolHandler extends Protoc
var dynamic = source.getDynamic()
if (dynamic) {
temp_dest_counter += 1
- val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: LiteralPart(temp_dest_counter.toString) :: Nil
val rc = SimpleAddress("queue", Path(parts))
val actual = new Source();
var address = destination_parser.encode_destination(rc)
@@ -817,7 +817,7 @@ class AmqpProtocolHandler extends Protoc
} else {
dm.getFooter.getValue.asInstanceOf[java.util.Map[AnyRef,AnyRef]]
}
- footer_map.put(ORIGIN, session_id.get)
+ footer_map.put(ORIGIN, session_id)
val message = new AmqpMessage(null, dm)
val d = new Delivery
@@ -941,7 +941,7 @@ class AmqpProtocolHandler extends Protoc
if( delivery.message.codec eq AmqpMessageCodec ) {
if ( noLocal ) {
val origin = delivery.message.asInstanceOf[AmqpMessage].getFooterProperty(ORIGIN)
- if ( origin == session_id.get ) {
+ if ( origin == session_id ) {
return false
}
}
@@ -1045,7 +1045,7 @@ class AmqpProtocolHandler extends Protoc
message_id_counter += 1
val message = new MessageImpl
- message.setMessageId(session_id.get + message_id_counter)
+ message.setMessageId(session_id + message_id_counter)
message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
message.setContentType(content_type)
message.setDurable(apollo_delivery.persistent)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Apr 3 20:21:18 2013
@@ -116,7 +116,7 @@ class BrokerConnection(var connector: Co
var protocol_handler: ProtocolHandler = null;
- def session_id = Option(protocol_handler).flatMap(_.session_id)
+ def session_id = Option(protocol_handler).flatMap(x=> Option(x.session_id))
override def toString = "id: "+id.toString
protected override def _start(on_completed:Task) = {
@@ -164,7 +164,7 @@ class BrokerConnection(var connector: Co
result.connector = connector.id
result.remote_address = Option(transport.getRemoteAddress).map(_.toString).getOrElse(null)
result.local_address = Option(transport.getLocalAddress).map(_.toString).getOrElse(null)
- result.protocol_session_id = protocol_handler.session_id.getOrElse(null)
+ result.protocol_session_id = protocol_handler.session_id
val wf = transport.getProtocolCodec
if( wf!=null ) {
result.write_counter = wf.getWriteCounter
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Apr 3 20:21:18 2013
@@ -311,8 +311,8 @@ class LocalRouter(val virtual_host:Virtu
for(dest <- get_destination_matches(address.path)) {
if( is_temp(address) ) {
val owner = temp_owner(address).get
- for( connection <- security.session_id) {
- if( (virtual_host.broker.id, connection) != owner ) {
+ if( security.session_id !=null ) {
+ if( (virtual_host.broker.id, security.session_id) != owner ) {
return Some("Not authorized to destroy the temp %s '%s'. Principals=%s".format(dest.resource_kind.id, dest.id, security.principal_dump))
}
}
@@ -338,8 +338,8 @@ class LocalRouter(val virtual_host:Virtu
if( is_temp(bind_address) ) {
temp_owner(bind_address) match {
case Some(owner) =>
- for( connection <- security.session_id) {
- if( (virtual_host.broker.id, connection) != owner ) {
+ if( security.session_id!=null) {
+ if( (virtual_host.broker.id, security.session_id) != owner ) {
return Some("Not authorized to receive from the temporary destination. Principals=%s".format(security.principal_dump))
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Apr 3 20:21:18 2013
@@ -74,6 +74,26 @@ abstract class Sink[T] {
}
}
+abstract class AbstractSinkFilter[Y, T <: Object] extends Sink[Y] {
+ def downstream:Sink[T]
+ def refiller:Task = downstream.refiller
+ def refiller_=(value:Task) { downstream.refiller=value }
+ def full: Boolean = downstream.full
+
+ def offer(value:Y) = {
+ if( full ) {
+ false
+ } else {
+ val opt = filter(value)
+ if( opt !=null ) {
+ downstream.offer(opt)
+ }
+ true
+ }
+ }
+ def filter(value:Y):T
+}
+
case class FullSink[T]() extends Sink[T] {
def refiller:Task = null
def refiller_=(value:Task) = {}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Apr 3 20:21:18 2013
@@ -135,8 +135,7 @@ class AnyProtocolHandler extends Protoco
var discriminated = false
- def session_id = None
-
+ def session_id = null
var config:DetectDTO = _
override def on_transport_command(command: AnyRef) = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Apr 3 20:21:18 2013
@@ -84,7 +84,7 @@ trait ProtocolHandler {
def protocol:String
- def session_id:Option[String]
+ def session_id: String
var connection:BrokerConnection = null;
def defer(func: =>Unit) = connection.defer(func)
@@ -110,20 +110,20 @@ trait ProtocolHandler {
abstract class AbstractProtocolHandler extends ProtocolHandler
-@deprecated(message="Please use the ProtocolFilter2 interface instead", since="1.3")
+@deprecated(message="Please use the ProtocolFilter3 interface instead", since="1.3")
trait ProtocolFilter {
def filter[T](command: T):T
}
-object ProtocolFilter2Factory {
+object ProtocolFilter3Factory {
val providers = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/protocol-filter-factory.index",classOf[Provider])
trait Provider {
- def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2
+ def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter3
}
- def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2 = {
+ def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter3 = {
for( p <- providers.singletons ) {
val rc = p.create(dto, handler)
if( rc!=null ) {
@@ -134,15 +134,19 @@ object ProtocolFilter2Factory {
}
}
-object SimpleProtocolFilter2Factory extends ProtocolFilter2Factory.Provider {
- def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2 = dto match {
+object SimpleProtocolFilter3Factory extends ProtocolFilter3Factory.Provider {
+ def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter3 = dto match {
case dto:SimpleProtocolFilterDTO =>
val instance = Broker.class_loader.loadClass(dto.kind).newInstance().asInstanceOf[AnyRef]
val filter = instance match {
- case self:ProtocolFilter2 => self
- case self:ProtocolFilter => new ProtocolFilter2() {
- override def filter_inbound[T](command: T): Option[T] = Some(self.filter(command))
- override def filter_outbound[T](command: T): Option[T] = Some(command)
+ case self:ProtocolFilter3 => self
+ case self:ProtocolFilter2 => new ProtocolFilter3() {
+ override def filter_inbound[T<:Object](command: T) = self.filter_inbound(command);
+ override def filter_outbound[T<:Object](command: T) = self.filter_outbound(command);
+ }
+ case self:ProtocolFilter => new ProtocolFilter3() {
+ override def filter_inbound[T<:Object](command: T) = self.filter(command)
+ override def filter_outbound[T<:Object](command: T) = command
}
case null => null
case _ => throw new IllegalArgumentException("Invalid protocol filter type: "+instance.getClass)
@@ -163,13 +167,13 @@ object SimpleProtocolFilter2Factory exte
}
}
-object ProtocolFilter2 {
- def create_filters(dtos:java.util.List[ProtocolFilterDTO], handler:ProtocolHandler):java.util.List[ProtocolFilter2] = {
+object ProtocolFilter3 {
+ def create_filters(dtos:java.util.List[ProtocolFilterDTO], handler:ProtocolHandler):java.util.List[ProtocolFilter3] = {
import collection.JavaConversions._
collection.JavaConversions.seqAsJavaList(create_filters(dtos.toList, handler));
}
- def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler):List[ProtocolFilter2] = {
- dtos.map(ProtocolFilter2Factory.create(_, handler))
+ def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler):List[ProtocolFilter3] = {
+ dtos.map(ProtocolFilter3Factory.create(_, handler))
}
}
@@ -177,18 +181,38 @@ object ProtocolFilter2 {
* A Protocol filter can filter frames being sent/received to and from a client. It can modify
* the frame or even drop it.
*/
+@deprecated(message="Please use the ProtocolFilter3 interface instead", since="1.7")
abstract class ProtocolFilter2 {
/**
* Filters a command frame received from a client.
* returns None if the filter wants to drop the frame.
*/
- def filter_inbound[T](frame: T):Option[T]
+ def filter_inbound[T](frame: T):T
+
+ /**
+ * Filters a command frame being sent client.
+ * returns None if the filter wants to drop the frame.
+ */
+ def filter_outbound[T](frame: T):T
+}
+
+/**
+ * A Protocol filter can filter frames being sent/received to and from a client. It can modify
+ * the frame or even drop it.
+ */
+abstract class ProtocolFilter3 {
+
+ /**
+ * Filters a command frame received from a client.
+ * returns None if the filter wants to drop the frame.
+ */
+ def filter_inbound[T<:Object](frame: T):T
/**
* Filters a command frame being sent client.
* returns None if the filter wants to drop the frame.
*/
- def filter_outbound[T](frame: T):Option[T]
+ def filter_outbound[T<:Object](frame: T):T
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Wed Apr 3 20:21:18 2013
@@ -124,7 +124,8 @@ abstract class UdpProtocolHandler extend
def configClass:Class[ConfigTypeDTO]
def protocol = "udp"
- var session_id:Option[String] = None
+ var session_id:String = null
+
var buffer_size = 640*1024
var connection_log:Log = _
@@ -146,7 +147,7 @@ abstract class UdpProtocolHandler extend
override def on_transport_connected = {
connection.transport.resumeRead
import collection.JavaConversions._
- session_id = Some("%s-%x".format(broker.default_virtual_host.config.id, broker.default_virtual_host.session_counter.incrementAndGet))
+ session_id = "%s-%x".format(broker.default_virtual_host.config.id, broker.default_virtual_host.session_counter.incrementAndGet)
configure(connection.connector.config match {
case connector_config:AcceptingConnectorDTO =>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala Wed Apr 3 20:21:18 2013
@@ -41,8 +41,8 @@ class SecurityContext {
var local_address:SocketAddress = _
var remote_address:SocketAddress = _
var login_context:LoginContext = _
- var session_id:Option[String] = None
- var remote_application:Option[String] = None
+ var session_id:String = null
+ var remote_application:String = null
case class Key(user:String,
password:String,
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java Wed Apr 3 20:21:18 2013
@@ -18,8 +18,8 @@ package org.apache.activemq.apollo.mqtt;
import org.apache.activemq.apollo.broker.*;
import org.apache.activemq.apollo.broker.protocol.AbstractProtocolHandler;
-import org.apache.activemq.apollo.broker.protocol.ProtocolFilter2;
-import org.apache.activemq.apollo.broker.protocol.ProtocolFilter2$;
+import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3;
+import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3$;
import org.apache.activemq.apollo.broker.security.SecurityContext;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.ProtocolDTO;
@@ -31,7 +31,6 @@ import org.fusesource.hawtdispatch.Dispa
import org.fusesource.hawtdispatch.Task;
import org.fusesource.hawtdispatch.transport.HeartBeatMonitor;
import org.fusesource.mqtt.codec.*;
-import scala.Option;
import java.io.IOException;
import java.util.ArrayList;
@@ -111,14 +110,14 @@ public class MqttProtocolHandler extends
return destination_parser;
}
- final ArrayList<ProtocolFilter2> protocol_filters = new ArrayList<ProtocolFilter2>();
+ final ArrayList<ProtocolFilter3> protocol_filters = new ArrayList<ProtocolFilter3>();
/////////////////////////////////////////////////////////////////////
//
// Bits related setting up a client connection
//
/////////////////////////////////////////////////////////////////////
- public Option<String> session_id() {
+ public String session_id() {
return security_context.session_id();
}
@@ -146,7 +145,7 @@ public class MqttProtocolHandler extends
codec.setMaxMessageLength(get(config.max_message_length, codec.getMaxMessageLength()));
protocol_filters.clear();
- protocol_filters.addAll(ProtocolFilter2$.MODULE$.create_filters(config.protocol_filters, this));
+ protocol_filters.addAll(ProtocolFilter3$.MODULE$.create_filters(config.protocol_filters, this));
security_context.local_address_$eq(connection().transport().getLocalAddress());
security_context.remote_address_$eq(connection().transport().getRemoteAddress());
@@ -172,18 +171,26 @@ public class MqttProtocolHandler extends
};
if (!protocol_filters.isEmpty()) {
- filtering_sink = filtering_sink.flatMap(toScala(new Fn1<Request, Option<Request>>() {
+ final Sink<Request> downstream = filtering_sink;
+ filtering_sink = new AbstractSinkFilter<Request, Request>() {
+
+ @Override
+ public Sink<Request> downstream() {
+ return downstream;
+ }
+
@Override
- public Option<Request> apply(Request x) {
- Option<Request> cur = some(x);
- for (ProtocolFilter2 filter : protocol_filters) {
- if (cur.isDefined()) {
- cur = filter.filter_outbound(cur.get());
+ public Request filter(Request value) {
+ Request cur = value;
+ for (ProtocolFilter3 filter : protocol_filters) {
+ cur = filter.filter_outbound(cur);
+ if(cur == null ) {
+ break;
}
}
return cur;
}
- }));
+ };
}
sink_manager = new SinkMux<Request>(filtering_sink);
connection_sink = new OverflowSink(sink_manager.open());
@@ -360,11 +367,9 @@ public class MqttProtocolHandler extends
public void on_transport_command(Object command) {
try {
if (!protocol_filters.isEmpty()) {
- for (ProtocolFilter2 filter : protocol_filters) {
- Option<Object> opt = filter.filter_inbound(command);
- if (opt.isDefined()) {
- command = opt.get();
- } else {
+ for (ProtocolFilter3 filter : protocol_filters) {
+ command = filter.filter_inbound(command);
+ if (command==null) {
return; // dropping the frame.
}
}
@@ -432,7 +437,7 @@ public class MqttProtocolHandler extends
UTF8Buffer client_id = connect_message.clientId();
security_context.user_$eq(Scala2Java.toString(connect_message.userName()));
security_context.password_$eq(Scala2Java.toString(connect_message.password()));
- security_context.session_id_$eq(Scala2Java.some(client_id.toString()));
+ security_context.session_id_$eq(client_id.toString());
final short keep_alive = connect_message.keepAlive();
if (keep_alive > 0) {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Apr 3 20:21:18 2013
@@ -434,7 +434,7 @@ class OpenwireProtocolHandler extends Pr
security_context.certificates = connection.certificates
security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
- security_context.session_id = Some(info.getConnectionId.toString)
+ security_context.session_id = info.getConnectionId.toString
if( host.authenticator!=null && host.authorizer!=null ) {
suspend_read("authenticating and authorizing connect")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Apr 3 20:21:18 2013
@@ -23,7 +23,7 @@ import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker._
import Buffer._
import java.lang.String
-import protocol.{ProtocolFilter2, ProtocolHandler}
+import protocol.{ProtocolFilter3, ProtocolHandler}
import security.SecurityContext
import Stomp._
import org.apache.activemq.apollo.selector.SelectorParser
@@ -504,7 +504,7 @@ class StompProtocolHandler extends Proto
case _ => (message.encoded, "protocol/"+message.codec.id())
}
message_id_counter += 1
- var headers = (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
+ var headers = (MESSAGE_ID -> ascii(session_id+message_id_counter)) :: Nil
headers ::= (CONTENT_TYPE -> ascii(content_type))
headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
@@ -690,7 +690,7 @@ class StompProtocolHandler extends Proto
var waiting_on = WAITING_ON_CLIENT_REQUEST
var config:StompDTO = _
- var protocol_filters = List[ProtocolFilter2]()
+ var protocol_filters = List[ProtocolFilter3]()
var destination_parser = Stomp.destination_parser
var protocol_convert = "full"
@@ -708,7 +708,7 @@ class StompProtocolHandler extends Proto
rc.map { dest =>
if( dest.domain.startsWith("temp-") ) {
temp_destination_map.getOrElseUpdate(dest, {
- val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: dest.path.parts
SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
})
} else {
@@ -732,7 +732,7 @@ class StompProtocolHandler extends Proto
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
- protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+ protocol_filters = ProtocolFilter3.create_filters(config.protocol_filters.toList, this)
import OptionSupport._
Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length = _ )
@@ -830,12 +830,17 @@ class StompProtocolHandler extends Proto
}
if(!protocol_filters.isEmpty) {
- filtering_sink = filtering_sink.flatMap {x=>
- var cur = Option(x)
- protocol_filters.foreach { filter =>
- cur = cur.flatMap(filter.filter_outbound(_))
+ filtering_sink = new AbstractSinkFilter[StompFrame, StompFrame]() {
+ val downstream = filtering_sink
+ def filter(value: StompFrame): StompFrame = {
+ var cur = value
+ for(filter <- protocol_filters) {
+ if( cur != null ) {
+ cur = filter.filter_outbound(cur)
+ }
+ }
+ cur
}
- cur
}
}
@@ -939,14 +944,16 @@ class StompProtocolHandler extends Proto
trace("received frame: %s", f)
val frame = if(!protocol_filters.isEmpty) {
- var cur = Option(f)
- protocol_filters.foreach { filter =>
- cur = cur.flatMap(filter.filter_inbound(_))
+ var cur = f
+ for( filter <- protocol_filters) {
+ if( cur !=null ) {
+ cur = filter.filter_inbound(cur)
+ }
+ }
+ if( cur == null ) {
+ return // dropping the frame.
}
- cur match {
- case Some(f) => f
- case None => return // dropping the frame.
- }
+ cur
} else {
f
}
@@ -1081,7 +1088,7 @@ class StompProtocolHandler extends Proto
connected_headers += SERVER->encode_header("apache-apollo/"+Broker.version)
connected_headers += HOST_ID->encode_header(host.id)
- connected_headers += SESSION->encode_header(session_id.get)
+ connected_headers += SESSION->encode_header(session_id)
val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
connected_headers += HEART_BEAT->outbound_heart_beat_header
@@ -1114,7 +1121,7 @@ class StompProtocolHandler extends Proto
async_die(headers, "")
} else {
this.host=host
- security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
+ security_context.session_id = "%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet)
connection_log = host.connection_log
if( host.authenticator!=null && host.authorizer!=null ) {
suspend_read("authenticating and authorizing connect")
@@ -1349,7 +1356,7 @@ class StompProtocolHandler extends Proto
// Do we need to add the message id?
if( get( headers, MESSAGE_ID) == None ) {
message_id_counter += 1
- rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
+ rc ::= (MESSAGE_ID -> ascii(session_id+message_id_counter))
}
if( config.add_timestamp_header!=null ) {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala Wed Apr 3 20:21:18 2013
@@ -40,7 +40,7 @@ class StompUdpProtocol extends UdpProtoc
var config:ConfigTypeDTO = _
var destination_parser = Stomp.destination_parser
- var protocol_filters = List[ProtocolFilter2]()
+ var protocol_filters = List[ProtocolFilter3]()
var message_id_counter = 0L
var default_virtual_host:VirtualHost = _
@@ -48,7 +48,7 @@ class StompUdpProtocol extends UdpProtoc
config = c.getOrElse(new ConfigTypeDTO)
import collection.JavaConversions._
default_virtual_host = broker.default_virtual_host
- protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+ protocol_filters = ProtocolFilter3.create_filters(config.protocol_filters.toList, this)
// Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length = _ )
@@ -119,17 +119,16 @@ class StompUdpProtocol extends UdpProtoc
try {
var frame = StompCodec.decode_frame(new Buffer(udp.buffer))
- frame = if(!protocol_filters.isEmpty) {
- var cur = Option(frame)
- protocol_filters.foreach { filter =>
- cur = cur.flatMap(filter.filter_inbound(_))
+
+ if(!protocol_filters.isEmpty) {
+ for( filter <- protocol_filters) {
+ if( frame !=null ) {
+ frame = filter.filter_inbound(frame)
+ }
}
- cur match {
- case Some(frame) => frame
- case None => return None
+ if( frame == null ) {
+ return None
}
- } else {
- frame
}
val virtual_host = get(frame.headers, HOST) match {
@@ -188,7 +187,7 @@ class StompUdpProtocol extends UdpProtoc
// Do we need to add the message id?
if( get( headers, MESSAGE_ID) == None ) {
message_id_counter += 1
- rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
+ rc ::= (MESSAGE_ID -> ascii(session_id+message_id_counter))
}
if( config.add_timestamp_header!=null ) {