You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/23 00:57:51 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5795

Repository: activemq
Updated Branches:
  refs/heads/master 7043f32bb -> 13b915ad1


https://issues.apache.org/jira/browse/AMQ-5795

Allow fallback of message transformers when configured one can't handle
the incoming message. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/13b915ad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/13b915ad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/13b915ad

Branch: refs/heads/master
Commit: 13b915ad19c502e1cae6cb5a20467bfcb4d51e92
Parents: 7043f32
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 22 18:57:42 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 22 18:57:42 2015 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeInboundTransformer.java   |  10 ++
 .../amqp/message/AMQPRawInboundTransformer.java |  10 ++
 .../amqp/message/InboundTransformer.java        |   6 +-
 .../message/JMSMappingInboundTransformer.java   |  10 ++
 .../transport/amqp/protocol/AmqpReceiver.java   |  23 ++-
 .../transport/amqp/client/AmqpMessage.java      |  48 +++++
 .../transport/amqp/client/AmqpSession.java      |   2 +-
 .../interop/AmqpDescribedTypePayloadTest.java   | 176 +++++++++++++++++++
 8 files changed, 281 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
index 9789e7b..a28b301 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -25,6 +25,16 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
     }
 
     @Override
+    public String getTransformerName() {
+        return TRANSFORMER_NATIVE;
+    }
+
+    @Override
+    public InboundTransformer getFallbackTransformer() {
+        return new AMQPRawInboundTransformer(getVendor());
+    }
+
+    @Override
     public Message transform(EncodedMessage amqpMessage) throws Exception {
         org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index 5742a76..d60a96b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -26,6 +26,16 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
     }
 
     @Override
+    public String getTransformerName() {
+        return TRANSFORMER_RAW;
+    }
+
+    @Override
+    public InboundTransformer getFallbackTransformer() {
+        return null;  // No fallback from full raw transform
+    }
+
+    @Override
     public Message transform(EncodedMessage amqpMessage) throws Exception {
         BytesMessage rc = vendor.createBytesMessage();
         rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index eff0ae7..1310bcd 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -59,7 +59,11 @@ public abstract class InboundTransformer {
         this.vendor = vendor;
     }
 
-    abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
+    public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
+
+    public abstract String getTransformerName();
+
+    public abstract InboundTransformer getFallbackTransformer();
 
     public int getDefaultDeliveryMode() {
         return defaultDeliveryMode;

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 63e216c..55a4db9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -40,6 +40,16 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
         super(vendor);
     }
 
+    @Override
+    public String getTransformerName() {
+        return TRANSFORMER_JMS;
+    }
+
+    @Override
+    public InboundTransformer getFallbackTransformer() {
+        return new AMQPNativeInboundTransformer(getVendor());
+    }
+
     @SuppressWarnings({ "unchecked" })
     @Override
     public Message transform(EncodedMessage amqpMessage) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 5d8b502..6e52fec 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -125,7 +125,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
 
     //----- Internal Implementation ------------------------------------------//
 
-    protected InboundTransformer getInboundTransformer() {
+    protected InboundTransformer getTransformer() {
         if (inboundTransformer == null) {
             String transformer = session.getConnection().getConfiguredTransformer();
             if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
@@ -146,7 +146,26 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
     protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
         if (!isClosed()) {
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
-            final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
+
+            InboundTransformer transformer = getTransformer();
+            ActiveMQMessage message = null;
+
+            while (transformer != null) {
+                try {
+                    message = (ActiveMQMessage) transformer.transform(em);
+                    break;
+                } catch (Exception e) {
+                    LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
+                    LOG.trace("Transformation error:", e);
+
+                    transformer = transformer.getFallbackTransformer();
+                }
+            }
+
+            if (message == null) {
+                throw new IOException("Failed to transform incoming delivery, skipping.");
+            }
+
             current = null;
 
             if (isAnonymous()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 705a90d..d29d620 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -18,10 +18,12 @@ package org.apache.activemq.transport.amqp.client;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
@@ -341,6 +343,8 @@ public class AmqpMessage {
         return deliveryAnnotationsMap.get(Symbol.valueOf(key));
     }
 
+    //----- Methods for manipulating the Message body ------------------------//
+
     /**
      * Sets a String value into the body of an outgoing Message, throws
      * an exception if this is an incoming message instance.
@@ -371,6 +375,50 @@ public class AmqpMessage {
         getWrappedMessage().setBody(body);
     }
 
+    /**
+     * Sets a byte array value into the body of an outgoing Message, throws
+     * an exception if this is an incoming message instance.
+     *
+     * @param value
+     *        the byte array value to store in the Message body.
+     *
+     * @throws IllegalStateException if the message is read only.
+     */
+    public void setDescribedType(DescribedType described) throws IllegalStateException {
+        checkReadOnly();
+        AmqpValue body = new AmqpValue(described);
+        getWrappedMessage().setBody(body);
+    }
+
+    /**
+     * Attempts to retrieve the message body as an DescribedType instance.
+     *
+     * @return an DescribedType instance if one is stored in the message body.
+     *
+     * @throws NoSuchElementException if the body does not contain a DescribedType.
+     */
+    public DescribedType getDescribedType() throws NoSuchElementException {
+        DescribedType result = null;
+
+        if (getWrappedMessage().getBody() == null) {
+            return null;
+        } else {
+            if (getWrappedMessage().getBody() instanceof AmqpValue) {
+                AmqpValue value = (AmqpValue) getWrappedMessage().getBody();
+
+                if (value.getValue() == null) {
+                    result = null;
+                } else if (value.getValue() instanceof DescribedType) {
+                    result = (DescribedType) value.getValue();
+                } else {
+                    throw new NoSuchElementException("Message does not contain a DescribedType body");
+                }
+            }
+        }
+
+        return result;
+    }
+
     //----- Internal implementation ------------------------------------------//
 
     private void checkReadOnly() throws IllegalStateException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index b7ebeec..b934f95 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -398,7 +398,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     }
 
     private void checkClosed() {
-        if (isClosed()) {
+        if (isClosed() || connection.isClosed()) {
             throw new IllegalStateException("Session is already closed");
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java
new file mode 100644
index 0000000..23b1c97
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test that the broker can pass through an AMQP message with a described type
+ * in the message body regardless of transformer in use.
+ */
+@RunWith(Parameterized.class)
+public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
+
+    private final String transformer;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {"jms"},
+            {"native"},
+            {"raw"}
+        });
+    }
+
+    public AmqpDescribedTypePayloadTest(String transformer) {
+        this.transformer = transformer;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return transformer;
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageWithDescribedTypeInBody() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpMessage message = new AmqpMessage();
+        message.setDescribedType(new AmqpNoLocalFilter());
+        sender.send(message);
+        sender.close();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        assertNotNull(received.getDescribedType());
+        receiver.close();
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpMessage message = new AmqpMessage();
+        message.setDescribedType(new AmqpNoLocalFilter());
+        sender.send(message);
+        sender.close();
+        connection.close();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+        Connection jmsConnection = factory.createConnection();
+        Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = jmsSession.createQueue(getTestName());
+        MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+        jmsConnection.start();
+
+        Message received = jmsConsumer.receive(5000);
+        assertNotNull(received);
+        assertTrue(received instanceof BytesMessage);
+        jmsConnection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testDescribedTypeMessageRoundTrips() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        // Send with AMQP client.
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpMessage message = new AmqpMessage();
+        message.setDescribedType(new AmqpNoLocalFilter());
+        sender.send(message);
+        sender.close();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        // Receive and resend with OpenWire JMS client
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+        Connection jmsConnection = factory.createConnection();
+        Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = jmsSession.createQueue(getTestName());
+        MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+        jmsConnection.start();
+
+        Message received = jmsConsumer.receive(5000);
+        assertNotNull(received);
+        assertTrue(received instanceof BytesMessage);
+
+        MessageProducer jmsProducer = jmsSession.createProducer(destination);
+        jmsProducer.send(received);
+        jmsConnection.close();
+
+        assertEquals(1, queue.getQueueSize());
+
+        // Now lets receive it with AMQP and see that we get back what we expected.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(returned);
+        assertNotNull(returned.getDescribedType());
+        receiver.close();
+        connection.close();
+    }
+}