You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/30 14:31:04 UTC

svn commit: r1403689 - in /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp: AmqpProtocolConverter.java transform/InboundTransformer.java

Author: chirino
Date: Tue Oct 30 13:31:04 2012
New Revision: 1403689

URL: http://svn.apache.org/viewvc?rev=1403689&view=rev
Log:
Temp queues verified to work now.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1403689&r1=1403688&r2=1403689&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Tue Oct 30 13:31:04 2012
@@ -324,7 +324,6 @@ class AmqpProtocolConverter {
     private ConnectionInfo connectionInfo = new ConnectionInfo();
     private long nextSessionId = 0;
     private long nextTempDestinationId = 0;
-    HashMap<Sender, ActiveMQDestination> tempDestinations = new HashMap<Sender, ActiveMQDestination>();
 
     static abstract class AmqpDeliveryListener {
         abstract public void onDelivery(Delivery delivery) throws Exception;
@@ -604,8 +603,19 @@ class AmqpProtocolConverter {
             receiver.open();
             pumpProtonToSocket();
         } else {
+            org.apache.qpid.proton.type.messaging.Target target = (Target) remoteTarget;
             ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-            ActiveMQDestination dest = createDestination(remoteTarget);
+            ActiveMQDestination dest;
+            if( target.getDynamic() ) {
+                dest = createTempQueue();
+                org.apache.qpid.proton.type.messaging.Target actualTarget = new org.apache.qpid.proton.type.messaging.Target();
+                actualTarget.setAddress(dest.getQualifiedName());
+                actualTarget.setDynamic(true);
+                receiver.setTarget(actualTarget);
+            } else {
+                dest = createDestination(remoteTarget);
+            }
+
             ProducerContext producerContext = new ProducerContext(producerId, dest);
 
             receiver.setContext(producerContext);
@@ -645,12 +655,6 @@ class AmqpProtocolConverter {
         }
     }
 
-    private Source createSource(ActiveMQDestination dest) {
-        org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
-        rc.setAddress(getInboundTransformer().getVendor().toAddress(dest));
-        return rc;
-    }
-
     OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
 
@@ -736,22 +740,27 @@ class AmqpProtocolConverter {
                 final MessageDispatch md = outbound.removeFirst();
                 try {
                     final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
-                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
-                    final EncodedMessage amqp = outboundTransformer.transform(jms);
-                    if( amqp!=null && amqp.getLength() > 0 ) {
-
-                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
-                        if( presettle ) {
-                            currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
-                        } else {
-                            final byte[] tag = nextTag();
-                            currentDelivery = sender.delivery(tag, 0, tag.length);
-                        }
-                        currentDelivery.setContext(md);
-
+                    if( jms==null ) {
+                        // It's the end of browse signal.
+                        sender.drained();
                     } else {
-                        // TODO: message could not be generated what now?
+                        jms.setRedeliveryCounter(md.getRedeliveryCounter());
+                        final EncodedMessage amqp = outboundTransformer.transform(jms);
+                        if( amqp!=null && amqp.getLength() > 0 ) {
+
+                            currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+                            if( presettle ) {
+                                currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                            } else {
+                                final byte[] tag = nextTag();
+                                currentDelivery = sender.delivery(tag, 0, tag.length);
+                            }
+                            currentDelivery.setContext(md);
+
+                        } else {
+                            // TODO: message could not be generated what now?
 
+                        }
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -857,27 +866,18 @@ class AmqpProtocolConverter {
         subscriptionsByConsumerId.put(id, consumerContext);
 
         ActiveMQDestination dest;
-        final Source remoteSource = sender.getRemoteSource();
-        if( remoteSource != null ) {
-            dest = createDestination(remoteSource);
+        org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
+        if( source != null && !source.getDynamic() ) {
+            dest = createDestination(source);
         } else {
             // lets create a temp dest.
-//            if (topic) {
-//                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
-//            } else {
-                dest = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
-//            }
-
-            DestinationInfo info = new DestinationInfo();
-            info.setConnectionId(connectionId);
-            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
-            info.setDestination(dest);
-            sendToActiveMQ(info, null);
-            tempDestinations.put(sender, dest);
-            sender.setSource(createSource(dest));
+            dest = createTempQueue();
+            source = new org.apache.qpid.proton.type.messaging.Source();
+            source.setAddress(dest.getQualifiedName());
+            source.setDynamic(true);
+            sender.setSource(source);
         }
 
-
         sender.setContext(consumerContext);
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setDestination(dest);
@@ -903,6 +903,17 @@ class AmqpProtocolConverter {
 
     }
 
+    private ActiveMQDestination createTempQueue() {
+        ActiveMQDestination rc;
+        rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionId);
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        info.setDestination(rc);
+        sendToActiveMQ(info, null);
+        return rc;
+    }
+
 //    void onUnSubscribe(UNSUBSCRIBE command) {
 //        UTF8Buffer[] topics = command.topics();
 //        if (topics != null) {

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1403689&r1=1403688&r2=1403689&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Tue Oct 30 13:31:04 2012
@@ -194,14 +194,17 @@ public abstract class InboundTransformer
 
     private void setProperty(Message msg, String key, Object value) throws JMSException {
         //TODO support all types
-        if( value instanceof String ) {
-            msg.setStringProperty(key, (String) value);
-        } else if( value instanceof Integer ) {
-            msg.setIntProperty(key, ((Integer) value).intValue());
-        } else if( value instanceof Long ) {
-            msg.setLongProperty(key, ((Long) value).longValue());
-        } else {
-            throw new RuntimeException("Unexpected value type: "+value.getClass());
-        }
+        msg.setObjectProperty(key, value);
+//        if( value instanceof String ) {
+//            msg.setStringProperty(key, (String) value);
+//        } else if( value instanceof Double ) {
+//            msg.setDoubleProperty(key, ((Double) value).doubleValue());
+//        } else if( value instanceof Integer ) {
+//            msg.setIntProperty(key, ((Integer) value).intValue());
+//        } else if( value instanceof Long ) {
+//            msg.setLongProperty(key, ((Long) value).longValue());
+//        } else {
+//            throw new RuntimeException("Unexpected value type: "+value.getClass());
+//        }
     }
 }