You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/03 22:21:19 UTC

svn commit: r1464191 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protoc...

Author: chirino
Date: Wed Apr  3 20:21:18 2013
New Revision: 1464191

URL: http://svn.apache.org/r1464191
Log:
Removing Option from some interfaces to further reduce dependency on scala.

Removed:
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Wed Apr  3 20:21:18 2013
@@ -329,7 +329,7 @@ class AmqpProtocolHandler extends Protoc
 
 
     def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
-      security_context.remote_application = Some(conn.getRemoteContainer())
+      security_context.remote_application = conn.getRemoteContainer()
 
       suspend_read("host lookup")
       broker.dispatch_queue {
@@ -350,7 +350,7 @@ class AmqpProtocolHandler extends Protoc
             connection_log = virtual_host.connection_log
             host = virtual_host
             proton.setLocalContainerId(virtual_host.id)
-            security_context.session_id = Some("%s-%x".format(host.config.id, host.session_counter.incrementAndGet))
+            security_context.session_id = "%s-%x".format(host.config.id, host.session_counter.incrementAndGet)
             //                proton.open()
             //                callback.onSuccess(response)
             if (virtual_host.authenticator != null && virtual_host.authorizer != null) {
@@ -693,7 +693,7 @@ class AmqpProtocolHandler extends Protoc
       dest =>
         if (dest.domain.startsWith("temp-")) {
           temp_destination_map.getOrElseUpdate(dest, {
-            val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+            val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: dest.path.parts
             SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
           })
         } else {
@@ -706,7 +706,7 @@ class AmqpProtocolHandler extends Protoc
     var dynamic = target.getDynamic()
     if (dynamic) {
       temp_dest_counter += 1
-      val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+      val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: LiteralPart(temp_dest_counter.toString) :: Nil
       val rc = SimpleAddress("queue", Path(parts))
       val actual = new Target();
       var address = destination_parser.encode_destination(rc)
@@ -728,7 +728,7 @@ class AmqpProtocolHandler extends Protoc
     var dynamic = source.getDynamic()
     if (dynamic) {
       temp_dest_counter += 1
-      val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+      val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: LiteralPart(temp_dest_counter.toString) :: Nil
       val rc = SimpleAddress("queue", Path(parts))
       val actual = new Source();
       var address = destination_parser.encode_destination(rc)
@@ -817,7 +817,7 @@ class AmqpProtocolHandler extends Protoc
       } else {
         dm.getFooter.getValue.asInstanceOf[java.util.Map[AnyRef,AnyRef]]
       }
-      footer_map.put(ORIGIN, session_id.get)
+      footer_map.put(ORIGIN, session_id)
       val message = new AmqpMessage(null, dm)
 
       val d = new Delivery
@@ -941,7 +941,7 @@ class AmqpProtocolHandler extends Protoc
       if( delivery.message.codec eq AmqpMessageCodec ) {
         if ( noLocal ) {
           val origin = delivery.message.asInstanceOf[AmqpMessage].getFooterProperty(ORIGIN)
-          if ( origin == session_id.get ) {
+          if ( origin == session_id ) {
             return false
           }
         }
@@ -1045,7 +1045,7 @@ class AmqpProtocolHandler extends Protoc
               message_id_counter += 1
 
               val message = new MessageImpl
-              message.setMessageId(session_id.get + message_id_counter)
+              message.setMessageId(session_id + message_id_counter)
               message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
               message.setContentType(content_type)
               message.setDurable(apollo_delivery.persistent)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Apr  3 20:21:18 2013
@@ -116,7 +116,7 @@ class BrokerConnection(var connector: Co
 
   var protocol_handler: ProtocolHandler = null;
 
-  def session_id = Option(protocol_handler).flatMap(_.session_id)
+  def session_id = Option(protocol_handler).flatMap(x=> Option(x.session_id))
   override def toString = "id: "+id.toString
 
   protected override  def _start(on_completed:Task) = {
@@ -164,7 +164,7 @@ class BrokerConnection(var connector: Co
     result.connector = connector.id
     result.remote_address = Option(transport.getRemoteAddress).map(_.toString).getOrElse(null)
     result.local_address = Option(transport.getLocalAddress).map(_.toString).getOrElse(null)
-    result.protocol_session_id = protocol_handler.session_id.getOrElse(null)
+    result.protocol_session_id = protocol_handler.session_id
     val wf = transport.getProtocolCodec
     if( wf!=null ) {
       result.write_counter = wf.getWriteCounter

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Apr  3 20:21:18 2013
@@ -311,8 +311,8 @@ class LocalRouter(val virtual_host:Virtu
       for(dest <- get_destination_matches(address.path)) {
         if( is_temp(address) ) {
           val owner = temp_owner(address).get
-          for( connection <- security.session_id) {
-            if( (virtual_host.broker.id, connection) != owner ) {
+          if( security.session_id !=null ) {
+            if( (virtual_host.broker.id, security.session_id) != owner ) {
               return Some("Not authorized to destroy the temp %s '%s'. Principals=%s".format(dest.resource_kind.id, dest.id, security.principal_dump))
             }
           }
@@ -338,8 +338,8 @@ class LocalRouter(val virtual_host:Virtu
       if( is_temp(bind_address) ) {
         temp_owner(bind_address) match {
           case Some(owner) =>
-            for( connection <- security.session_id) {
-              if( (virtual_host.broker.id, connection) != owner ) {
+            if( security.session_id!=null) {
+              if( (virtual_host.broker.id, security.session_id) != owner ) {
                 return Some("Not authorized to receive from the temporary destination. Principals=%s".format(security.principal_dump))
               }
             }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Apr  3 20:21:18 2013
@@ -74,6 +74,26 @@ abstract class Sink[T] {
   }
 }
 
+abstract class AbstractSinkFilter[Y, T <: Object] extends Sink[Y] {
+  def downstream:Sink[T]
+  def refiller:Task = downstream.refiller
+  def refiller_=(value:Task) { downstream.refiller=value }
+  def full: Boolean = downstream.full
+
+  def offer(value:Y) = {
+    if( full ) {
+      false
+    } else {
+      val opt = filter(value)
+      if( opt !=null ) {
+        downstream.offer(opt)
+      }
+      true
+    }
+  }
+  def filter(value:Y):T
+}
+
 case class FullSink[T]() extends Sink[T] {
   def refiller:Task = null
   def refiller_=(value:Task) = {}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Apr  3 20:21:18 2013
@@ -135,8 +135,7 @@ class AnyProtocolHandler extends Protoco
 
   var discriminated = false
 
-  def session_id = None
-
+  def session_id = null
   var config:DetectDTO = _
 
   override def on_transport_command(command: AnyRef) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Apr  3 20:21:18 2013
@@ -84,7 +84,7 @@ trait ProtocolHandler {
 
   def protocol:String
 
-  def session_id:Option[String]
+  def session_id: String
 
   var connection:BrokerConnection = null;
   def defer(func: =>Unit) = connection.defer(func)
@@ -110,20 +110,20 @@ trait ProtocolHandler {
 
 abstract class AbstractProtocolHandler extends ProtocolHandler
 
-@deprecated(message="Please use the ProtocolFilter2 interface instead", since="1.3")
+@deprecated(message="Please use the ProtocolFilter3 interface instead", since="1.3")
 trait ProtocolFilter {
   def filter[T](command: T):T
 }
 
-object ProtocolFilter2Factory {
+object ProtocolFilter3Factory {
 
   val providers = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/protocol-filter-factory.index",classOf[Provider])
 
   trait Provider {
-    def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2
+    def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter3
   }
 
-  def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2 = {
+  def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter3 = {
     for( p <- providers.singletons ) {
       val rc = p.create(dto, handler)
       if( rc!=null ) {
@@ -134,15 +134,19 @@ object ProtocolFilter2Factory {
   }
 }
 
-object SimpleProtocolFilter2Factory extends ProtocolFilter2Factory.Provider {
-  def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter2 = dto match {
+object SimpleProtocolFilter3Factory extends ProtocolFilter3Factory.Provider {
+  def create( dto:ProtocolFilterDTO, handler:ProtocolHandler ):ProtocolFilter3 = dto match {
     case dto:SimpleProtocolFilterDTO =>
       val instance = Broker.class_loader.loadClass(dto.kind).newInstance().asInstanceOf[AnyRef]
       val filter = instance match {
-        case self:ProtocolFilter2 => self
-        case self:ProtocolFilter => new ProtocolFilter2() {
-          override def filter_inbound[T](command: T): Option[T] = Some(self.filter(command))
-          override def filter_outbound[T](command: T): Option[T] = Some(command)
+        case self:ProtocolFilter3 => self
+        case self:ProtocolFilter2 => new ProtocolFilter3() {
+          override def filter_inbound[T<:Object](command: T) = self.filter_inbound(command);
+          override def filter_outbound[T<:Object](command: T) = self.filter_outbound(command);
+        }
+        case self:ProtocolFilter => new ProtocolFilter3() {
+          override def filter_inbound[T<:Object](command: T) = self.filter(command)
+          override def filter_outbound[T<:Object](command: T) = command
         }
         case null => null
         case _ => throw new IllegalArgumentException("Invalid protocol filter type: "+instance.getClass)
@@ -163,13 +167,13 @@ object SimpleProtocolFilter2Factory exte
   }
 }
 
-object ProtocolFilter2 {
-  def create_filters(dtos:java.util.List[ProtocolFilterDTO], handler:ProtocolHandler):java.util.List[ProtocolFilter2] = {
+object ProtocolFilter3 {
+  def create_filters(dtos:java.util.List[ProtocolFilterDTO], handler:ProtocolHandler):java.util.List[ProtocolFilter3] = {
     import collection.JavaConversions._
     collection.JavaConversions.seqAsJavaList(create_filters(dtos.toList, handler));
   }
-  def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler):List[ProtocolFilter2] = {
-    dtos.map(ProtocolFilter2Factory.create(_, handler))
+  def create_filters(dtos:List[ProtocolFilterDTO], handler:ProtocolHandler):List[ProtocolFilter3] = {
+    dtos.map(ProtocolFilter3Factory.create(_, handler))
   }
 }
 
@@ -177,18 +181,38 @@ object ProtocolFilter2 {
  * A Protocol filter can filter frames being sent/received to and from a client.  It can modify
  * the frame or even drop it.
  */
+@deprecated(message="Please use the ProtocolFilter3 interface instead", since="1.7")
 abstract class ProtocolFilter2 {
 
   /**
    * Filters a command frame received from a client.
    * returns None if the filter wants to drop the frame.
    */
-  def filter_inbound[T](frame: T):Option[T]
+  def filter_inbound[T](frame: T):T
+
+  /**
+   * Filters a command frame being sent client.
+   * returns None if the filter wants to drop the frame.
+   */
+  def filter_outbound[T](frame: T):T
+}
+
+/**
+ * A Protocol filter can filter frames being sent/received to and from a client.  It can modify
+ * the frame or even drop it.
+ */
+abstract class ProtocolFilter3 {
+
+  /**
+   * Filters a command frame received from a client.
+   * returns None if the filter wants to drop the frame.
+   */
+  def filter_inbound[T<:Object](frame: T):T
 
   /**
    * Filters a command frame being sent client.
    * returns None if the filter wants to drop the frame.
    */
-  def filter_outbound[T](frame: T):Option[T]
+  def filter_outbound[T<:Object](frame: T):T
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Wed Apr  3 20:21:18 2013
@@ -124,7 +124,8 @@ abstract class UdpProtocolHandler extend
   def configClass:Class[ConfigTypeDTO]
 
   def protocol = "udp"
-  var session_id:Option[String] = None
+  var session_id:String = null
+
 
   var buffer_size = 640*1024
   var connection_log:Log = _
@@ -146,7 +147,7 @@ abstract class UdpProtocolHandler extend
   override def on_transport_connected = {
     connection.transport.resumeRead
     import collection.JavaConversions._
-    session_id = Some("%s-%x".format(broker.default_virtual_host.config.id, broker.default_virtual_host.session_counter.incrementAndGet))
+    session_id = "%s-%x".format(broker.default_virtual_host.config.id, broker.default_virtual_host.session_counter.incrementAndGet)
 
     configure(connection.connector.config match {
       case connector_config:AcceptingConnectorDTO =>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala Wed Apr  3 20:21:18 2013
@@ -41,8 +41,8 @@ class SecurityContext {
   var local_address:SocketAddress = _
   var remote_address:SocketAddress = _
   var login_context:LoginContext = _
-  var session_id:Option[String] = None
-  var remote_application:Option[String] = None
+  var session_id:String = null
+  var remote_application:String = null
 
   case class Key(user:String,
     password:String,

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java Wed Apr  3 20:21:18 2013
@@ -18,8 +18,8 @@ package org.apache.activemq.apollo.mqtt;
 
 import org.apache.activemq.apollo.broker.*;
 import org.apache.activemq.apollo.broker.protocol.AbstractProtocolHandler;
-import org.apache.activemq.apollo.broker.protocol.ProtocolFilter2;
-import org.apache.activemq.apollo.broker.protocol.ProtocolFilter2$;
+import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3;
+import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3$;
 import org.apache.activemq.apollo.broker.security.SecurityContext;
 import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
 import org.apache.activemq.apollo.dto.ProtocolDTO;
@@ -31,7 +31,6 @@ import org.fusesource.hawtdispatch.Dispa
 import org.fusesource.hawtdispatch.Task;
 import org.fusesource.hawtdispatch.transport.HeartBeatMonitor;
 import org.fusesource.mqtt.codec.*;
-import scala.Option;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -111,14 +110,14 @@ public class MqttProtocolHandler extends
         return destination_parser;
     }
 
-    final ArrayList<ProtocolFilter2> protocol_filters = new ArrayList<ProtocolFilter2>();
+    final ArrayList<ProtocolFilter3> protocol_filters = new ArrayList<ProtocolFilter3>();
 
     /////////////////////////////////////////////////////////////////////
     //
     // Bits related setting up a client connection
     //
     /////////////////////////////////////////////////////////////////////
-    public Option<String> session_id() {
+    public String session_id() {
         return security_context.session_id();
     }
 
@@ -146,7 +145,7 @@ public class MqttProtocolHandler extends
         codec.setMaxMessageLength(get(config.max_message_length, codec.getMaxMessageLength()));
 
         protocol_filters.clear();
-        protocol_filters.addAll(ProtocolFilter2$.MODULE$.create_filters(config.protocol_filters, this));
+        protocol_filters.addAll(ProtocolFilter3$.MODULE$.create_filters(config.protocol_filters, this));
 
         security_context.local_address_$eq(connection().transport().getLocalAddress());
         security_context.remote_address_$eq(connection().transport().getRemoteAddress());
@@ -172,18 +171,26 @@ public class MqttProtocolHandler extends
         };
 
         if (!protocol_filters.isEmpty()) {
-            filtering_sink = filtering_sink.flatMap(toScala(new Fn1<Request, Option<Request>>() {
+            final Sink<Request> downstream  = filtering_sink;
+            filtering_sink = new AbstractSinkFilter<Request, Request>() {
+
+                @Override
+                public Sink<Request> downstream() {
+                    return downstream;
+                }
+
                 @Override
-                public Option<Request> apply(Request x) {
-                    Option<Request> cur = some(x);
-                    for (ProtocolFilter2 filter : protocol_filters) {
-                        if (cur.isDefined()) {
-                            cur = filter.filter_outbound(cur.get());
+                public Request filter(Request value) {
+                    Request cur = value;
+                    for (ProtocolFilter3 filter : protocol_filters) {
+                        cur = filter.filter_outbound(cur);
+                        if(cur == null ) {
+                            break;
                         }
                     }
                     return cur;
                 }
-            }));
+            };
         }
         sink_manager = new SinkMux<Request>(filtering_sink);
         connection_sink = new OverflowSink(sink_manager.open());
@@ -360,11 +367,9 @@ public class MqttProtocolHandler extends
     public void on_transport_command(Object command) {
         try {
             if (!protocol_filters.isEmpty()) {
-                for (ProtocolFilter2 filter : protocol_filters) {
-                    Option<Object> opt = filter.filter_inbound(command);
-                    if (opt.isDefined()) {
-                        command = opt.get();
-                    } else {
+                for (ProtocolFilter3 filter : protocol_filters) {
+                    command = filter.filter_inbound(command);
+                    if (command==null) {
                         return; // dropping the frame.
                     }
                 }
@@ -432,7 +437,7 @@ public class MqttProtocolHandler extends
         UTF8Buffer client_id = connect_message.clientId();
         security_context.user_$eq(Scala2Java.toString(connect_message.userName()));
         security_context.password_$eq(Scala2Java.toString(connect_message.password()));
-        security_context.session_id_$eq(Scala2Java.some(client_id.toString()));
+        security_context.session_id_$eq(client_id.toString());
 
         final short keep_alive = connect_message.keepAlive();
         if (keep_alive > 0) {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Apr  3 20:21:18 2013
@@ -434,7 +434,7 @@ class OpenwireProtocolHandler extends Pr
       security_context.certificates = connection.certificates
       security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
       security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
-      security_context.session_id = Some(info.getConnectionId.toString)
+      security_context.session_id = info.getConnectionId.toString
 
       if( host.authenticator!=null &&  host.authorizer!=null ) {
         suspend_read("authenticating and authorizing connect")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Apr  3 20:21:18 2013
@@ -23,7 +23,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker._
 import Buffer._
 import java.lang.String
-import protocol.{ProtocolFilter2, ProtocolHandler}
+import protocol.{ProtocolFilter3, ProtocolHandler}
 import security.SecurityContext
 import Stomp._
 import org.apache.activemq.apollo.selector.SelectorParser
@@ -504,7 +504,7 @@ class StompProtocolHandler extends Proto
             case _ => (message.encoded, "protocol/"+message.codec.id())
           }
           message_id_counter += 1
-          var headers =  (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
+          var headers =  (MESSAGE_ID -> ascii(session_id+message_id_counter)) :: Nil
           headers ::= (CONTENT_TYPE -> ascii(content_type))
           headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
           headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
@@ -690,7 +690,7 @@ class StompProtocolHandler extends Proto
   var waiting_on = WAITING_ON_CLIENT_REQUEST
   var config:StompDTO = _
 
-  var protocol_filters = List[ProtocolFilter2]()
+  var protocol_filters = List[ProtocolFilter3]()
 
   var destination_parser = Stomp.destination_parser
   var protocol_convert = "full"
@@ -708,7 +708,7 @@ class StompProtocolHandler extends Proto
     rc.map { dest =>
       if( dest.domain.startsWith("temp-") ) {
         temp_destination_map.getOrElseUpdate(dest, {
-          val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+          val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id) :: dest.path.parts
           SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
         })
       } else {
@@ -732,7 +732,7 @@ class StompProtocolHandler extends Proto
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
 
-    protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+    protocol_filters = ProtocolFilter3.create_filters(config.protocol_filters.toList, this)
 
     import OptionSupport._
     Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length = _ )
@@ -830,12 +830,17 @@ class StompProtocolHandler extends Proto
     }
 
     if(!protocol_filters.isEmpty) {
-      filtering_sink = filtering_sink.flatMap {x=>
-        var cur = Option(x)
-        protocol_filters.foreach { filter =>
-          cur = cur.flatMap(filter.filter_outbound(_))
+      filtering_sink = new AbstractSinkFilter[StompFrame, StompFrame]() {
+        val downstream = filtering_sink
+        def filter(value: StompFrame): StompFrame = {
+          var cur = value
+          for(filter <- protocol_filters) {
+            if( cur != null ) {
+              cur = filter.filter_outbound(cur)
+            }
+          }
+          cur
         }
-        cur
       }
     }
 
@@ -939,14 +944,16 @@ class StompProtocolHandler extends Proto
           trace("received frame: %s", f)
 
           val frame = if(!protocol_filters.isEmpty) {
-            var cur = Option(f)
-            protocol_filters.foreach { filter =>
-              cur = cur.flatMap(filter.filter_inbound(_))
+            var cur = f
+            for( filter <- protocol_filters) {
+              if( cur !=null ) {
+                cur = filter.filter_inbound(cur)
+              }
+            }
+            if( cur == null ) {
+              return // dropping the frame.
             }
-            cur match {
-              case Some(f) => f
-              case None => return // dropping the frame.
-            } 
+            cur
           } else {
             f
           }
@@ -1081,7 +1088,7 @@ class StompProtocolHandler extends Proto
 
       connected_headers += SERVER->encode_header("apache-apollo/"+Broker.version)
       connected_headers += HOST_ID->encode_header(host.id)
-      connected_headers += SESSION->encode_header(session_id.get)
+      connected_headers += SESSION->encode_header(session_id)
 
       val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
       connected_headers += HEART_BEAT->outbound_heart_beat_header
@@ -1114,7 +1121,7 @@ class StompProtocolHandler extends Proto
           async_die(headers, "")
         } else {
           this.host=host
-          security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
+          security_context.session_id = "%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet)
           connection_log = host.connection_log
           if( host.authenticator!=null &&  host.authorizer!=null ) {
             suspend_read("authenticating and authorizing connect")
@@ -1349,7 +1356,7 @@ class StompProtocolHandler extends Proto
     // Do we need to add the message id?
     if( get( headers, MESSAGE_ID) == None ) {
       message_id_counter += 1
-      rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
+      rc ::= (MESSAGE_ID -> ascii(session_id+message_id_counter))
     }
 
     if( config.add_timestamp_header!=null ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala?rev=1464191&r1=1464190&r2=1464191&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala Wed Apr  3 20:21:18 2013
@@ -40,7 +40,7 @@ class StompUdpProtocol extends UdpProtoc
 
     var config:ConfigTypeDTO = _
     var destination_parser = Stomp.destination_parser
-    var protocol_filters = List[ProtocolFilter2]()
+    var protocol_filters = List[ProtocolFilter3]()
     var message_id_counter = 0L
     var default_virtual_host:VirtualHost = _
 
@@ -48,7 +48,7 @@ class StompUdpProtocol extends UdpProtoc
       config = c.getOrElse(new ConfigTypeDTO)
       import collection.JavaConversions._
       default_virtual_host = broker.default_virtual_host
-      protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+      protocol_filters = ProtocolFilter3.create_filters(config.protocol_filters.toList, this)
 
 
 //      Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length = _ )
@@ -119,17 +119,16 @@ class StompUdpProtocol extends UdpProtoc
 
       try {
         var frame = StompCodec.decode_frame(new Buffer(udp.buffer))
-        frame = if(!protocol_filters.isEmpty) {
-          var cur = Option(frame)
-          protocol_filters.foreach { filter =>
-            cur = cur.flatMap(filter.filter_inbound(_))
+
+        if(!protocol_filters.isEmpty) {
+          for( filter <- protocol_filters) {
+            if( frame !=null ) {
+              frame = filter.filter_inbound(frame)
+            }
           }
-          cur match {
-            case Some(frame) => frame
-            case None => return None
+          if( frame == null ) {
+            return None
           }
-        } else {
-          frame
         }
 
         val virtual_host = get(frame.headers, HOST) match {
@@ -188,7 +187,7 @@ class StompUdpProtocol extends UdpProtoc
       // Do we need to add the message id?
       if( get( headers, MESSAGE_ID) == None ) {
         message_id_counter += 1
-        rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
+        rc ::= (MESSAGE_ID -> ascii(session_id+message_id_counter))
       }
 
       if( config.add_timestamp_header!=null ) {