You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/15 19:11:52 UTC

svn commit: r1409901 - in /activemq/activemq-apollo/trunk/apollo-amqp/src: main/scala/org/apache/activemq/apollo/amqp/ main/scala/org/apache/activemq/apollo/amqp/dto/ main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/ test/scala/org/apache/a...

Author: chirino
Date: Thu Nov 15 18:11:51 2012
New Revision: 1409901

URL: http://svn.apache.org/viewvc?rev=1409901&view=rev
Log:
Made protocol tracing configurable, fixing bugs seen during load testing.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Nov 15 18:11:51 2012
@@ -177,6 +177,7 @@ class AmqpProtocolHandler extends Protoc
   }
 
   var amqp_connection:AmqpTransport = _
+  var amqp_trace = false
 
   def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
 
@@ -201,12 +202,17 @@ class AmqpProtocolHandler extends Protoc
 
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find(_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+    amqp_trace = OptionSupport(config.trace).getOrElse(amqp_trace)
+
+    def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
+    codec.setMaxFrameSize(mem_size(config.max_frame_size, "100M"))
+
     amqp_connection = AmqpTransport.accept(connection.transport)
     amqp_connection.setListener(amqp_listener)
-    if( false ) {
+    if( amqp_trace ) {
       amqp_connection.setProtocolTracer(new ProtocolTracer() {
         def receivedFrame(transportFrame: TransportFrame) = {
-  //        println("RECV: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
+          println("RECV: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
   //        connection_log.trace("RECV: %s | %s", security_context.remote_address, transportFrame.getBody());
         }
         def sentFrame(transportFrame: TransportFrame) = {
@@ -640,7 +646,7 @@ class AmqpProtocolHandler extends Protoc
 
     def process(delivery: DeliveryImpl): Unit = {
       if (!delivery.isReadable()) {
-        System.out.println("it was not readable!");
+        trace("it was not readable!");
         return;
       }
 
@@ -769,6 +775,7 @@ class AmqpProtocolHandler extends Protoc
               case _ =>
                 async_die("uknown", "Unexpected NAK from broker")
             }
+            pump_out
           }
         }
       } else {
@@ -855,78 +862,65 @@ class AmqpProtocolHandler extends Protoc
     }
 
     val redeliveries = new util.LinkedList[(Session[Delivery], Delivery)]()
-    val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 1, buffer_size) {
+    val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 100, buffer_size) {
       override def time_stamp = broker.now
 
-      var currentBuffer: Buffer = _;
-      var currentDelivery: DeliveryImpl = _;
-
       override def drain_overflow: Unit = {
         queue.assertExecuting()
         var pumpNeeded = false
         try {
-          while (true) {
-            while (currentBuffer != null) {
-              if (sender.getCredit > 0) {
-                val sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
-                currentBuffer.moveHead(sent);
-                val (session, apollo_delivery) = currentDelivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
-                delivered(session, apollo_delivery.size)
-                pumpNeeded = true
-                if (currentBuffer.length == 0) {
-                  if (presettle) {
-                    settle(currentDelivery, Consumed, false);
-                  } else {
-                    sender.advance();
-                  }
-                  currentBuffer = null;
-                  currentDelivery = null;
-                }
-              } else {
-                return;
-              }
-            }
-
+          while ((sender.getCredit - sender.getQueued) > 0) {
             val value = poll
             if (value == null) {
               return
+            }
+
+            val (session, apollo_delivery) = value
+            val message = if (apollo_delivery.message.codec == AmqpMessageCodec) {
+              apollo_delivery.message.asInstanceOf[AmqpMessage].decoded
             } else {
-              val (session, delivery) = value
-              val message = if (delivery.message.codec == AmqpMessageCodec) {
-                delivery.message.asInstanceOf[AmqpMessage].decoded
-              } else {
-                val (body, content_type) = protocol_convert match {
-                  case "body" => (delivery.message.getBodyAs(classOf[Buffer]), "protocol/" + delivery.message.codec.id + ";conv=body")
-                  case _ => (delivery.message.encoded, "protocol/" + delivery.message.codec.id())
-                }
-
-                message_id_counter += 1
-
-                val message = new org.apache.qpid.proton.message.Message
-                message.setMessageId(session_id.get + message_id_counter)
-                message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
-                message.setContentType(content_type)
-                message.setDurable(delivery.persistent)
-                if (delivery.expiration > 0) {
-                  message.setExpiryTime(delivery.expiration)
-                }
-                message
+              val (body, content_type) = protocol_convert match {
+                case "body" => (apollo_delivery.message.getBodyAs(classOf[Buffer]), "protocol/" + apollo_delivery.message.codec.id + ";conv=body")
+                case _ => (apollo_delivery.message.encoded, "protocol/" + apollo_delivery.message.codec.id())
               }
 
-              if (delivery.redeliveries > 0) {
-                message.setDeliveryCount(delivery.redeliveries)
-                message.setFirstAcquirer(false)
-              }
+              message_id_counter += 1
 
-              currentBuffer = new AmqpMessage(null, message).encoded;
-              if (presettle) {
-                currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0).asInstanceOf[DeliveryImpl];
-              } else {
-                val tag = nextTag
-                currentDelivery = sender.delivery(tag, 0, tag.length).asInstanceOf[DeliveryImpl];
-                unsettled.put(new AsciiBuffer(tag), currentDelivery)
+              val message = new org.apache.qpid.proton.message.Message
+              message.setMessageId(session_id.get + message_id_counter)
+              message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
+              message.setContentType(content_type)
+              message.setDurable(apollo_delivery.persistent)
+              if (apollo_delivery.expiration > 0) {
+                message.setExpiryTime(apollo_delivery.expiration)
               }
-              currentDelivery.setContext(value);
+              message
+            }
+
+            if (apollo_delivery.redeliveries > 0) {
+              message.setDeliveryCount(apollo_delivery.redeliveries)
+              message.setFirstAcquirer(false)
+            }
+
+            val buffer = new AmqpMessage(null, message).encoded;
+            val proton_delivery = if (presettle) {
+              sender.delivery(EMPTY_BYTE_ARRAY, 0, 0).asInstanceOf[DeliveryImpl];
+            } else {
+              val tag = nextTag
+              val proton_delivery = sender.delivery(tag, 0, tag.length).asInstanceOf[DeliveryImpl];
+              unsettled.put(new AsciiBuffer(tag), proton_delivery)
+              proton_delivery
+            }
+
+            val sent = sender.send(buffer.data, buffer.offset, buffer.length);
+            assert( sent == buffer.length )
+            delivered(session, apollo_delivery.size)
+            pumpNeeded = true
+            proton_delivery.setContext(value)
+            if (presettle) {
+              settle(proton_delivery, Consumed, false);
+            } else {
+              sender.advance();
             }
           }
         } finally {
@@ -979,8 +973,12 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean) {
-      val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+    def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean):Unit = {
+      val ctx = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+      if( ctx==null ) {
+        return
+      }
+      val (session, apollo_delivery) = ctx
       if( incrementRedelivery ) {
         apollo_delivery.redelivered
       }
@@ -1063,6 +1061,7 @@ class AmqpProtocolHandler extends Protoc
       var next = session_manager.poll
       while( next!=null ) {
         reject(next, Undelivered)
+        next = session_manager.poll
       }
       super.dispose()
     }

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java Thu Nov 15 18:11:51 2012
@@ -35,38 +35,11 @@ import java.util.List;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class AmqpDTO extends ProtocolDTO {
 
-    @XmlAttribute(name="add_user_header")
-    public String add_user_header;
+    @XmlAttribute(name="trace")
+    public Boolean trace;
 
-    /**
-     * A broker accepts connections via it's configured connectors.
-     */
-    @XmlElement(name="add_user_header")
-    public List<AddUserHeaderDTO> add_user_headers = new ArrayList<AddUserHeaderDTO>();
-
-    /**
-     * If set, it will add the configured header name with the value
-     * set the a timestamp of when the message is received.
-     */
-    @XmlAttribute(name="add_timestamp_header")
-    public String add_timestamp_header;
-
-    /**
-     * If set, the configured header will be added to message
-     * sent to consumer if the message is a redelivery.  It will be
-     * set to the number of re-deliveries that have occurred.
-     */
-    @XmlAttribute(name="add_redeliveries_header")
-    public String add_redeliveries_header;
-
-    @XmlAttribute(name="max_header_length")
-    public String max_header_length;
-
-    @XmlAttribute(name="max_headers")
-    public Integer max_headers;
-
-    @XmlAttribute(name="max_data_length")
-    public String max_data_length;
+    @XmlAttribute(name="max_frame_size")
+    public String max_frame_size;
 
     @XmlElementRef
     public List<ProtocolFilterDTO> protocol_filters = new ArrayList<ProtocolFilterDTO>();
@@ -115,14 +88,6 @@ public class AmqpDTO extends ProtocolDTO
 
         AmqpDTO amqpDTO = (AmqpDTO) o;
 
-        if (add_redeliveries_header != null ? !add_redeliveries_header.equals(amqpDTO.add_redeliveries_header) : amqpDTO.add_redeliveries_header != null)
-            return false;
-        if (add_timestamp_header != null ? !add_timestamp_header.equals(amqpDTO.add_timestamp_header) : amqpDTO.add_timestamp_header != null)
-            return false;
-        if (add_user_header != null ? !add_user_header.equals(amqpDTO.add_user_header) : amqpDTO.add_user_header != null)
-            return false;
-        if (add_user_headers != null ? !add_user_headers.equals(amqpDTO.add_user_headers) : amqpDTO.add_user_headers != null)
-            return false;
         if (any_child_wildcard != null ? !any_child_wildcard.equals(amqpDTO.any_child_wildcard) : amqpDTO.any_child_wildcard != null)
             return false;
         if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(amqpDTO.any_descendant_wildcard) : amqpDTO.any_descendant_wildcard != null)
@@ -131,12 +96,9 @@ public class AmqpDTO extends ProtocolDTO
             return false;
         if (destination_separator != null ? !destination_separator.equals(amqpDTO.destination_separator) : amqpDTO.destination_separator != null)
             return false;
-        if (die_delay != null ? !die_delay.equals(amqpDTO.die_delay) : amqpDTO.die_delay != null) return false;
-        if (max_data_length != null ? !max_data_length.equals(amqpDTO.max_data_length) : amqpDTO.max_data_length != null)
+        if (die_delay != null ? !die_delay.equals(amqpDTO.die_delay) : amqpDTO.die_delay != null)
             return false;
-        if (max_header_length != null ? !max_header_length.equals(amqpDTO.max_header_length) : amqpDTO.max_header_length != null)
-            return false;
-        if (max_headers != null ? !max_headers.equals(amqpDTO.max_headers) : amqpDTO.max_headers != null)
+        if (max_frame_size != null ? !max_frame_size.equals(amqpDTO.max_frame_size) : amqpDTO.max_frame_size != null)
             return false;
         if (path_separator != null ? !path_separator.equals(amqpDTO.path_separator) : amqpDTO.path_separator != null)
             return false;
@@ -154,6 +116,8 @@ public class AmqpDTO extends ProtocolDTO
             return false;
         if (topic_prefix != null ? !topic_prefix.equals(amqpDTO.topic_prefix) : amqpDTO.topic_prefix != null)
             return false;
+        if (trace != null ? !trace.equals(amqpDTO.trace) : amqpDTO.trace != null)
+            return false;
 
         return true;
     }
@@ -161,13 +125,8 @@ public class AmqpDTO extends ProtocolDTO
     @Override
     public int hashCode() {
         int result = super.hashCode();
-        result = 31 * result + (add_user_header != null ? add_user_header.hashCode() : 0);
-        result = 31 * result + (add_user_headers != null ? add_user_headers.hashCode() : 0);
-        result = 31 * result + (add_timestamp_header != null ? add_timestamp_header.hashCode() : 0);
-        result = 31 * result + (add_redeliveries_header != null ? add_redeliveries_header.hashCode() : 0);
-        result = 31 * result + (max_header_length != null ? max_header_length.hashCode() : 0);
-        result = 31 * result + (max_headers != null ? max_headers.hashCode() : 0);
-        result = 31 * result + (max_data_length != null ? max_data_length.hashCode() : 0);
+        result = 31 * result + (trace != null ? trace.hashCode() : 0);
+        result = 31 * result + (max_frame_size != null ? max_frame_size.hashCode() : 0);
         result = 31 * result + (protocol_filters != null ? protocol_filters.hashCode() : 0);
         result = 31 * result + (queue_prefix != null ? queue_prefix.hashCode() : 0);
         result = 31 * result + (topic_prefix != null ? topic_prefix.hashCode() : 0);

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java Thu Nov 15 18:11:51 2012
@@ -99,4 +99,11 @@ public class AmqpProtocolCodec extends A
         nextDecodeAction = initialDecodeAction();
     }
 
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java Thu Nov 15 18:11:51 2012
@@ -46,13 +46,6 @@ public class XmlCodecTest {
         assertEquals(1, connector.protocols.size());
         ProtocolDTO amqp = connector.protocols.get(0);
         assertTrue(amqp instanceof AmqpDTO);
-        assertEquals("JMSXUserID", ((AmqpDTO) amqp).add_user_header);
-
-        List<AddUserHeaderDTO> add_user_headers = ((AmqpDTO) amqp).add_user_headers;
-        assertEquals(2, add_user_headers.size());
-        assertEquals("GroupId", add_user_headers.get(0).name);
-        assertEquals("UserId", add_user_headers.get(1).name);
-        assertEquals("UserPrincipal", add_user_headers.get(1).kind);
 
     }