You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/31 17:31:51 UTC

svn commit: r1508929 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/ope...

Author: chirino
Date: Wed Jul 31 15:31:51 2013
New Revision: 1508929

URL: http://svn.apache.org/r1508929
Log:
Change the any protocol so that it swaps out the codec and handler when the AnyProtocolHandler receives the ProtocolDetected command.

Also the any protocol does not pass the new codec down the the next protocol handler anymore so don't try to handle that case anymore.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1508929&r1=1508928&r2=1508929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Jul 31 15:31:51 2013
@@ -62,7 +62,7 @@ object AnyProtocol extends BaseProtocol 
 
 }
 
-case class ProtocolDetected(id:String, codec:ProtocolCodec)
+case class ProtocolDetected(id:String)
 
 class AnyProtocolCodec(val connector:Connector) extends ProtocolCodec {
 
@@ -82,18 +82,15 @@ class AnyProtocolCodec(val connector:Con
 
 
   def read: AnyRef = {
-    if (next != null) {
-      throw new IllegalStateException
+    if (next!=null) {
+      return next.read()
     }
 
     channel.read(buffer)
     val buff = new Buffer(buffer.array(), 0, buffer.position())
     protocols.foreach {protocol =>
       if (protocol.matchesIdentification(buff)) {
-        next = protocol.createProtocolCodec(connector)
-        AnyProtocol.change_protocol_codec(transport, next)
-        next.unread(buff.toByteArray)
-        return ProtocolDetected(protocol.id, next)
+        return ProtocolDetected(protocol.id)
       }
     }
     if (buffer.position() == buffer.capacity) {
@@ -141,27 +138,32 @@ class AnyProtocolHandler extends Protoco
   def async_die(client_message:String) = connection.stop(NOOP)
 
   override def on_transport_command(command: AnyRef) = {
-  def async_die(client_message:String) = connection.stop(NOOP)
-
-    if (!command.isInstanceOf[ProtocolDetected]) {
-      throw new ProtocolException("Expected a ProtocolDetected object");
+    if( discriminated ) {
+      throw new ProtocolException("Protocol already discriminated");
     }
 
-    discriminated = true
-
-    var protocol: ProtocolDetected = command.asInstanceOf[ProtocolDetected];
-    val protocol_handler = ProtocolFactory.get(protocol.id) match {
-      case Some(x) => x.createProtocolHandler(connection.connector)
-      case None =>
-        throw new ProtocolException("No protocol handler available for protocol: " + protocol.id);
-    }
+    command match {
+      case detected:ProtocolDetected =>
+        discriminated = true
+        val protocol = ProtocolFactory.get(detected.id).getOrElse(throw new ProtocolException("No protocol handler available for protocol: " + detected.id))
+        val protocol_handler = protocol.createProtocolHandler(connection.connector)
+
+        // Swap out the protocol codec
+        val any_codec = connection.protocol_codec(classOf[AnyProtocolCodec])
+        val next = protocol.createProtocolCodec(connection.connector)
+        AnyProtocol.change_protocol_codec(connection.transport, next)
+        val buff = new Buffer(any_codec.buffer.array(), 0, any_codec.buffer.position())
+        next.unread(buff.toByteArray)
 
-     // replace the current handler with the new one.
-    connection.protocol_handler = protocol_handler
-    connection.transport.suspendRead
+        // Swap out the protocol handler.
+        connection.protocol_handler = protocol_handler
+        connection.transport.suspendRead
+        protocol_handler.set_connection(connection);
+        connection.transport.getTransportListener.onTransportConnected()
 
-    protocol_handler.set_connection(connection);
-    connection.transport.getTransportListener.onTransportConnected()
+      case _ =>
+        throw new ProtocolException("Expected a ProtocolDetected object");
+    }
   }
 
   override def on_transport_connected = {

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java?rev=1508929&r1=1508928&r2=1508929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java Wed Jul 31 15:31:51 2013
@@ -402,10 +402,7 @@ public class MqttProtocolHandler extends
         return new UnitFn1<Object>() {
             @Override
             public void call(Object o) {
-                if (o instanceof MQTTProtocolCodec) {
-                    // this is passed on to us by the protocol discriminator
-                    // so we know which wire format is being used.
-                } else if (o instanceof MQTTFrame) {
+                if (o instanceof MQTTFrame) {
                     MQTTFrame command = (MQTTFrame) o;
                     try {
                         if (command.messageType() == CONNECT.TYPE) {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1508929&r1=1508928&r2=1508929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Jul 31 15:31:51 2013
@@ -284,9 +284,6 @@ class OpenwireProtocolHandler extends Pr
       trace("received: %s", command)
       if (wire_format == null) {
         command match {
-          case codec: OpenwireCodec =>
-            // this is passed on to us by the protocol discriminator
-            // so we know which wire format is being used.
           case command: WireFormatInfo =>
             on_wire_format_info(command)
           case _ =>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1508929&r1=1508928&r2=1508929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jul 31 15:31:51 2013
@@ -989,9 +989,6 @@ class StompProtocolHandler extends Proto
     }
     try {
       command match {
-        case s:StompCodec =>
-          // this is passed on to us by the protocol discriminator
-          // so we know which wire format is being used.
         case f:StompFrame=>
 
           trace("received frame: %s", f)