You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/30 14:31:04 UTC
svn commit: r1403689 - in
/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp:
AmqpProtocolConverter.java transform/InboundTransformer.java
Author: chirino
Date: Tue Oct 30 13:31:04 2012
New Revision: 1403689
URL: http://svn.apache.org/viewvc?rev=1403689&view=rev
Log:
Temp queues verified to work now.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1403689&r1=1403688&r2=1403689&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Tue Oct 30 13:31:04 2012
@@ -324,7 +324,6 @@ class AmqpProtocolConverter {
private ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
private long nextTempDestinationId = 0;
- HashMap<Sender, ActiveMQDestination> tempDestinations = new HashMap<Sender, ActiveMQDestination>();
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
@@ -604,8 +603,19 @@ class AmqpProtocolConverter {
receiver.open();
pumpProtonToSocket();
} else {
+ org.apache.qpid.proton.type.messaging.Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
- ActiveMQDestination dest = createDestination(remoteTarget);
+ ActiveMQDestination dest;
+ if( target.getDynamic() ) {
+ dest = createTempQueue();
+ org.apache.qpid.proton.type.messaging.Target actualTarget = new org.apache.qpid.proton.type.messaging.Target();
+ actualTarget.setAddress(dest.getQualifiedName());
+ actualTarget.setDynamic(true);
+ receiver.setTarget(actualTarget);
+ } else {
+ dest = createDestination(remoteTarget);
+ }
+
ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext);
@@ -645,12 +655,6 @@ class AmqpProtocolConverter {
}
}
- private Source createSource(ActiveMQDestination dest) {
- org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
- rc.setAddress(getInboundTransformer().getVendor().toAddress(dest));
- return rc;
- }
-
OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
@@ -736,22 +740,27 @@ class AmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
- jms.setRedeliveryCounter(md.getRedeliveryCounter());
- final EncodedMessage amqp = outboundTransformer.transform(jms);
- if( amqp!=null && amqp.getLength() > 0 ) {
-
- currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
- if( presettle ) {
- currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
- } else {
- final byte[] tag = nextTag();
- currentDelivery = sender.delivery(tag, 0, tag.length);
- }
- currentDelivery.setContext(md);
-
+ if( jms==null ) {
+ // It's the end of browse signal.
+ sender.drained();
} else {
- // TODO: message could not be generated what now?
+ jms.setRedeliveryCounter(md.getRedeliveryCounter());
+ final EncodedMessage amqp = outboundTransformer.transform(jms);
+ if( amqp!=null && amqp.getLength() > 0 ) {
+
+ currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+ if( presettle ) {
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ } else {
+ final byte[] tag = nextTag();
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ }
+ currentDelivery.setContext(md);
+
+ } else {
+ // TODO: message could not be generated what now?
+ }
}
} catch (Exception e) {
e.printStackTrace();
@@ -857,27 +866,18 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.put(id, consumerContext);
ActiveMQDestination dest;
- final Source remoteSource = sender.getRemoteSource();
- if( remoteSource != null ) {
- dest = createDestination(remoteSource);
+ org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
+ if( source != null && !source.getDynamic() ) {
+ dest = createDestination(source);
} else {
// lets create a temp dest.
-// if (topic) {
-// dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
-// } else {
- dest = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
-// }
-
- DestinationInfo info = new DestinationInfo();
- info.setConnectionId(connectionId);
- info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
- info.setDestination(dest);
- sendToActiveMQ(info, null);
- tempDestinations.put(sender, dest);
- sender.setSource(createSource(dest));
+ dest = createTempQueue();
+ source = new org.apache.qpid.proton.type.messaging.Source();
+ source.setAddress(dest.getQualifiedName());
+ source.setDynamic(true);
+ sender.setSource(source);
}
-
sender.setContext(consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(dest);
@@ -903,6 +903,17 @@ class AmqpProtocolConverter {
}
+ private ActiveMQDestination createTempQueue() {
+ ActiveMQDestination rc;
+ rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+ DestinationInfo info = new DestinationInfo();
+ info.setConnectionId(connectionId);
+ info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+ info.setDestination(rc);
+ sendToActiveMQ(info, null);
+ return rc;
+ }
+
// void onUnSubscribe(UNSUBSCRIBE command) {
// UTF8Buffer[] topics = command.topics();
// if (topics != null) {
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1403689&r1=1403688&r2=1403689&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Tue Oct 30 13:31:04 2012
@@ -194,14 +194,17 @@ public abstract class InboundTransformer
private void setProperty(Message msg, String key, Object value) throws JMSException {
//TODO support all types
- if( value instanceof String ) {
- msg.setStringProperty(key, (String) value);
- } else if( value instanceof Integer ) {
- msg.setIntProperty(key, ((Integer) value).intValue());
- } else if( value instanceof Long ) {
- msg.setLongProperty(key, ((Long) value).longValue());
- } else {
- throw new RuntimeException("Unexpected value type: "+value.getClass());
- }
+ msg.setObjectProperty(key, value);
+// if( value instanceof String ) {
+// msg.setStringProperty(key, (String) value);
+// } else if( value instanceof Double ) {
+// msg.setDoubleProperty(key, ((Double) value).doubleValue());
+// } else if( value instanceof Integer ) {
+// msg.setIntProperty(key, ((Integer) value).intValue());
+// } else if( value instanceof Long ) {
+// msg.setLongProperty(key, ((Long) value).longValue());
+// } else {
+// throw new RuntimeException("Unexpected value type: "+value.getClass());
+// }
}
}