You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/15 19:11:52 UTC
svn commit: r1409901 - in /activemq/activemq-apollo/trunk/apollo-amqp/src:
main/scala/org/apache/activemq/apollo/amqp/
main/scala/org/apache/activemq/apollo/amqp/dto/
main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/
test/scala/org/apache/a...
Author: chirino
Date: Thu Nov 15 18:11:51 2012
New Revision: 1409901
URL: http://svn.apache.org/viewvc?rev=1409901&view=rev
Log:
Made protocol tracing configurable, fixing bugs seen during load testing.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Nov 15 18:11:51 2012
@@ -177,6 +177,7 @@ class AmqpProtocolHandler extends Protoc
}
var amqp_connection:AmqpTransport = _
+ var amqp_trace = false
def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
@@ -201,12 +202,17 @@ class AmqpProtocolHandler extends Protoc
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.find(_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+ amqp_trace = OptionSupport(config.trace).getOrElse(amqp_trace)
+
+ def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
+ codec.setMaxFrameSize(mem_size(config.max_frame_size, "100M"))
+
amqp_connection = AmqpTransport.accept(connection.transport)
amqp_connection.setListener(amqp_listener)
- if( false ) {
+ if( amqp_trace ) {
amqp_connection.setProtocolTracer(new ProtocolTracer() {
def receivedFrame(transportFrame: TransportFrame) = {
- // println("RECV: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
+ println("RECV: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
// connection_log.trace("RECV: %s | %s", security_context.remote_address, transportFrame.getBody());
}
def sentFrame(transportFrame: TransportFrame) = {
@@ -640,7 +646,7 @@ class AmqpProtocolHandler extends Protoc
def process(delivery: DeliveryImpl): Unit = {
if (!delivery.isReadable()) {
- System.out.println("it was not readable!");
+ trace("it was not readable!");
return;
}
@@ -769,6 +775,7 @@ class AmqpProtocolHandler extends Protoc
case _ =>
async_die("uknown", "Unexpected NAK from broker")
}
+ pump_out
}
}
} else {
@@ -855,78 +862,65 @@ class AmqpProtocolHandler extends Protoc
}
val redeliveries = new util.LinkedList[(Session[Delivery], Delivery)]()
- val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 1, buffer_size) {
+ val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 100, buffer_size) {
override def time_stamp = broker.now
- var currentBuffer: Buffer = _;
- var currentDelivery: DeliveryImpl = _;
-
override def drain_overflow: Unit = {
queue.assertExecuting()
var pumpNeeded = false
try {
- while (true) {
- while (currentBuffer != null) {
- if (sender.getCredit > 0) {
- val sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
- currentBuffer.moveHead(sent);
- val (session, apollo_delivery) = currentDelivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
- delivered(session, apollo_delivery.size)
- pumpNeeded = true
- if (currentBuffer.length == 0) {
- if (presettle) {
- settle(currentDelivery, Consumed, false);
- } else {
- sender.advance();
- }
- currentBuffer = null;
- currentDelivery = null;
- }
- } else {
- return;
- }
- }
-
+ while ((sender.getCredit - sender.getQueued) > 0) {
val value = poll
if (value == null) {
return
+ }
+
+ val (session, apollo_delivery) = value
+ val message = if (apollo_delivery.message.codec == AmqpMessageCodec) {
+ apollo_delivery.message.asInstanceOf[AmqpMessage].decoded
} else {
- val (session, delivery) = value
- val message = if (delivery.message.codec == AmqpMessageCodec) {
- delivery.message.asInstanceOf[AmqpMessage].decoded
- } else {
- val (body, content_type) = protocol_convert match {
- case "body" => (delivery.message.getBodyAs(classOf[Buffer]), "protocol/" + delivery.message.codec.id + ";conv=body")
- case _ => (delivery.message.encoded, "protocol/" + delivery.message.codec.id())
- }
-
- message_id_counter += 1
-
- val message = new org.apache.qpid.proton.message.Message
- message.setMessageId(session_id.get + message_id_counter)
- message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
- message.setContentType(content_type)
- message.setDurable(delivery.persistent)
- if (delivery.expiration > 0) {
- message.setExpiryTime(delivery.expiration)
- }
- message
+ val (body, content_type) = protocol_convert match {
+ case "body" => (apollo_delivery.message.getBodyAs(classOf[Buffer]), "protocol/" + apollo_delivery.message.codec.id + ";conv=body")
+ case _ => (apollo_delivery.message.encoded, "protocol/" + apollo_delivery.message.codec.id())
}
- if (delivery.redeliveries > 0) {
- message.setDeliveryCount(delivery.redeliveries)
- message.setFirstAcquirer(false)
- }
+ message_id_counter += 1
- currentBuffer = new AmqpMessage(null, message).encoded;
- if (presettle) {
- currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0).asInstanceOf[DeliveryImpl];
- } else {
- val tag = nextTag
- currentDelivery = sender.delivery(tag, 0, tag.length).asInstanceOf[DeliveryImpl];
- unsettled.put(new AsciiBuffer(tag), currentDelivery)
+ val message = new org.apache.qpid.proton.message.Message
+ message.setMessageId(session_id.get + message_id_counter)
+ message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
+ message.setContentType(content_type)
+ message.setDurable(apollo_delivery.persistent)
+ if (apollo_delivery.expiration > 0) {
+ message.setExpiryTime(apollo_delivery.expiration)
}
- currentDelivery.setContext(value);
+ message
+ }
+
+ if (apollo_delivery.redeliveries > 0) {
+ message.setDeliveryCount(apollo_delivery.redeliveries)
+ message.setFirstAcquirer(false)
+ }
+
+ val buffer = new AmqpMessage(null, message).encoded;
+ val proton_delivery = if (presettle) {
+ sender.delivery(EMPTY_BYTE_ARRAY, 0, 0).asInstanceOf[DeliveryImpl];
+ } else {
+ val tag = nextTag
+ val proton_delivery = sender.delivery(tag, 0, tag.length).asInstanceOf[DeliveryImpl];
+ unsettled.put(new AsciiBuffer(tag), proton_delivery)
+ proton_delivery
+ }
+
+ val sent = sender.send(buffer.data, buffer.offset, buffer.length);
+ assert( sent == buffer.length )
+ delivered(session, apollo_delivery.size)
+ pumpNeeded = true
+ proton_delivery.setContext(value)
+ if (presettle) {
+ settle(proton_delivery, Consumed, false);
+ } else {
+ sender.advance();
}
}
} finally {
@@ -979,8 +973,12 @@ class AmqpProtocolHandler extends Protoc
}
}
- def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean) {
- val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+ def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean):Unit = {
+ val ctx = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+ if( ctx==null ) {
+ return
+ }
+ val (session, apollo_delivery) = ctx
if( incrementRedelivery ) {
apollo_delivery.redelivered
}
@@ -1063,6 +1061,7 @@ class AmqpProtocolHandler extends Protoc
var next = session_manager.poll
while( next!=null ) {
reject(next, Undelivered)
+ next = session_manager.poll
}
super.dispose()
}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java Thu Nov 15 18:11:51 2012
@@ -35,38 +35,11 @@ import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
public class AmqpDTO extends ProtocolDTO {
- @XmlAttribute(name="add_user_header")
- public String add_user_header;
+ @XmlAttribute(name="trace")
+ public Boolean trace;
- /**
- * A broker accepts connections via it's configured connectors.
- */
- @XmlElement(name="add_user_header")
- public List<AddUserHeaderDTO> add_user_headers = new ArrayList<AddUserHeaderDTO>();
-
- /**
- * If set, it will add the configured header name with the value
- * set the a timestamp of when the message is received.
- */
- @XmlAttribute(name="add_timestamp_header")
- public String add_timestamp_header;
-
- /**
- * If set, the configured header will be added to message
- * sent to consumer if the message is a redelivery. It will be
- * set to the number of re-deliveries that have occurred.
- */
- @XmlAttribute(name="add_redeliveries_header")
- public String add_redeliveries_header;
-
- @XmlAttribute(name="max_header_length")
- public String max_header_length;
-
- @XmlAttribute(name="max_headers")
- public Integer max_headers;
-
- @XmlAttribute(name="max_data_length")
- public String max_data_length;
+ @XmlAttribute(name="max_frame_size")
+ public String max_frame_size;
@XmlElementRef
public List<ProtocolFilterDTO> protocol_filters = new ArrayList<ProtocolFilterDTO>();
@@ -115,14 +88,6 @@ public class AmqpDTO extends ProtocolDTO
AmqpDTO amqpDTO = (AmqpDTO) o;
- if (add_redeliveries_header != null ? !add_redeliveries_header.equals(amqpDTO.add_redeliveries_header) : amqpDTO.add_redeliveries_header != null)
- return false;
- if (add_timestamp_header != null ? !add_timestamp_header.equals(amqpDTO.add_timestamp_header) : amqpDTO.add_timestamp_header != null)
- return false;
- if (add_user_header != null ? !add_user_header.equals(amqpDTO.add_user_header) : amqpDTO.add_user_header != null)
- return false;
- if (add_user_headers != null ? !add_user_headers.equals(amqpDTO.add_user_headers) : amqpDTO.add_user_headers != null)
- return false;
if (any_child_wildcard != null ? !any_child_wildcard.equals(amqpDTO.any_child_wildcard) : amqpDTO.any_child_wildcard != null)
return false;
if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(amqpDTO.any_descendant_wildcard) : amqpDTO.any_descendant_wildcard != null)
@@ -131,12 +96,9 @@ public class AmqpDTO extends ProtocolDTO
return false;
if (destination_separator != null ? !destination_separator.equals(amqpDTO.destination_separator) : amqpDTO.destination_separator != null)
return false;
- if (die_delay != null ? !die_delay.equals(amqpDTO.die_delay) : amqpDTO.die_delay != null) return false;
- if (max_data_length != null ? !max_data_length.equals(amqpDTO.max_data_length) : amqpDTO.max_data_length != null)
+ if (die_delay != null ? !die_delay.equals(amqpDTO.die_delay) : amqpDTO.die_delay != null)
return false;
- if (max_header_length != null ? !max_header_length.equals(amqpDTO.max_header_length) : amqpDTO.max_header_length != null)
- return false;
- if (max_headers != null ? !max_headers.equals(amqpDTO.max_headers) : amqpDTO.max_headers != null)
+ if (max_frame_size != null ? !max_frame_size.equals(amqpDTO.max_frame_size) : amqpDTO.max_frame_size != null)
return false;
if (path_separator != null ? !path_separator.equals(amqpDTO.path_separator) : amqpDTO.path_separator != null)
return false;
@@ -154,6 +116,8 @@ public class AmqpDTO extends ProtocolDTO
return false;
if (topic_prefix != null ? !topic_prefix.equals(amqpDTO.topic_prefix) : amqpDTO.topic_prefix != null)
return false;
+ if (trace != null ? !trace.equals(amqpDTO.trace) : amqpDTO.trace != null)
+ return false;
return true;
}
@@ -161,13 +125,8 @@ public class AmqpDTO extends ProtocolDTO
@Override
public int hashCode() {
int result = super.hashCode();
- result = 31 * result + (add_user_header != null ? add_user_header.hashCode() : 0);
- result = 31 * result + (add_user_headers != null ? add_user_headers.hashCode() : 0);
- result = 31 * result + (add_timestamp_header != null ? add_timestamp_header.hashCode() : 0);
- result = 31 * result + (add_redeliveries_header != null ? add_redeliveries_header.hashCode() : 0);
- result = 31 * result + (max_header_length != null ? max_header_length.hashCode() : 0);
- result = 31 * result + (max_headers != null ? max_headers.hashCode() : 0);
- result = 31 * result + (max_data_length != null ? max_data_length.hashCode() : 0);
+ result = 31 * result + (trace != null ? trace.hashCode() : 0);
+ result = 31 * result + (max_frame_size != null ? max_frame_size.hashCode() : 0);
result = 31 * result + (protocol_filters != null ? protocol_filters.hashCode() : 0);
result = 31 * result + (queue_prefix != null ? queue_prefix.hashCode() : 0);
result = 31 * result + (topic_prefix != null ? topic_prefix.hashCode() : 0);
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java Thu Nov 15 18:11:51 2012
@@ -99,4 +99,11 @@ public class AmqpProtocolCodec extends A
nextDecodeAction = initialDecodeAction();
}
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java?rev=1409901&r1=1409900&r2=1409901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java Thu Nov 15 18:11:51 2012
@@ -46,13 +46,6 @@ public class XmlCodecTest {
assertEquals(1, connector.protocols.size());
ProtocolDTO amqp = connector.protocols.get(0);
assertTrue(amqp instanceof AmqpDTO);
- assertEquals("JMSXUserID", ((AmqpDTO) amqp).add_user_header);
-
- List<AddUserHeaderDTO> add_user_headers = ((AmqpDTO) amqp).add_user_headers;
- assertEquals(2, add_user_headers.size());
- assertEquals("GroupId", add_user_headers.get(0).name);
- assertEquals("UserId", add_user_headers.get(1).name);
- assertEquals("UserPrincipal", add_user_headers.get(1).kind);
}