You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/23 00:57:51 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5795
Repository: activemq
Updated Branches:
refs/heads/master 7043f32bb -> 13b915ad1
https://issues.apache.org/jira/browse/AMQ-5795
Allow fallback of message transformers when configured one can't handle
the incoming message.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/13b915ad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/13b915ad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/13b915ad
Branch: refs/heads/master
Commit: 13b915ad19c502e1cae6cb5a20467bfcb4d51e92
Parents: 7043f32
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 22 18:57:42 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 22 18:57:42 2015 -0400
----------------------------------------------------------------------
.../message/AMQPNativeInboundTransformer.java | 10 ++
.../amqp/message/AMQPRawInboundTransformer.java | 10 ++
.../amqp/message/InboundTransformer.java | 6 +-
.../message/JMSMappingInboundTransformer.java | 10 ++
.../transport/amqp/protocol/AmqpReceiver.java | 23 ++-
.../transport/amqp/client/AmqpMessage.java | 48 +++++
.../transport/amqp/client/AmqpSession.java | 2 +-
.../interop/AmqpDescribedTypePayloadTest.java | 176 +++++++++++++++++++
8 files changed, 281 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
index 9789e7b..a28b301 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java
@@ -25,6 +25,16 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
}
@Override
+ public String getTransformerName() {
+ return TRANSFORMER_NATIVE;
+ }
+
+ @Override
+ public InboundTransformer getFallbackTransformer() {
+ return new AMQPRawInboundTransformer(getVendor());
+ }
+
+ @Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index 5742a76..d60a96b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -26,6 +26,16 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
}
@Override
+ public String getTransformerName() {
+ return TRANSFORMER_RAW;
+ }
+
+ @Override
+ public InboundTransformer getFallbackTransformer() {
+ return null; // No fallback from full raw transform
+ }
+
+ @Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
BytesMessage rc = vendor.createBytesMessage();
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index eff0ae7..1310bcd 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -59,7 +59,11 @@ public abstract class InboundTransformer {
this.vendor = vendor;
}
- abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
+ public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
+
+ public abstract String getTransformerName();
+
+ public abstract InboundTransformer getFallbackTransformer();
public int getDefaultDeliveryMode() {
return defaultDeliveryMode;
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 63e216c..55a4db9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -40,6 +40,16 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
super(vendor);
}
+ @Override
+ public String getTransformerName() {
+ return TRANSFORMER_JMS;
+ }
+
+ @Override
+ public InboundTransformer getFallbackTransformer() {
+ return new AMQPNativeInboundTransformer(getVendor());
+ }
+
@SuppressWarnings({ "unchecked" })
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 5d8b502..6e52fec 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -125,7 +125,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
//----- Internal Implementation ------------------------------------------//
- protected InboundTransformer getInboundTransformer() {
+ protected InboundTransformer getTransformer() {
if (inboundTransformer == null) {
String transformer = session.getConnection().getConfiguredTransformer();
if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
@@ -146,7 +146,26 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
if (!isClosed()) {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
- final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
+
+ InboundTransformer transformer = getTransformer();
+ ActiveMQMessage message = null;
+
+ while (transformer != null) {
+ try {
+ message = (ActiveMQMessage) transformer.transform(em);
+ break;
+ } catch (Exception e) {
+ LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
+ LOG.trace("Transformation error:", e);
+
+ transformer = transformer.getFallbackTransformer();
+ }
+ }
+
+ if (message == null) {
+ throw new IOException("Failed to transform incoming delivery, skipping.");
+ }
+
current = null;
if (isAnonymous()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 705a90d..d29d620 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -18,10 +18,12 @@ package org.apache.activemq.transport.amqp.client;
import java.util.HashMap;
import java.util.Map;
+import java.util.NoSuchElementException;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
@@ -341,6 +343,8 @@ public class AmqpMessage {
return deliveryAnnotationsMap.get(Symbol.valueOf(key));
}
+ //----- Methods for manipulating the Message body ------------------------//
+
/**
* Sets a String value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
@@ -371,6 +375,50 @@ public class AmqpMessage {
getWrappedMessage().setBody(body);
}
+ /**
+ * Sets a byte array value into the body of an outgoing Message, throws
+ * an exception if this is an incoming message instance.
+ *
+ * @param value
+ * the byte array value to store in the Message body.
+ *
+ * @throws IllegalStateException if the message is read only.
+ */
+ public void setDescribedType(DescribedType described) throws IllegalStateException {
+ checkReadOnly();
+ AmqpValue body = new AmqpValue(described);
+ getWrappedMessage().setBody(body);
+ }
+
+ /**
+ * Attempts to retrieve the message body as an DescribedType instance.
+ *
+ * @return an DescribedType instance if one is stored in the message body.
+ *
+ * @throws NoSuchElementException if the body does not contain a DescribedType.
+ */
+ public DescribedType getDescribedType() throws NoSuchElementException {
+ DescribedType result = null;
+
+ if (getWrappedMessage().getBody() == null) {
+ return null;
+ } else {
+ if (getWrappedMessage().getBody() instanceof AmqpValue) {
+ AmqpValue value = (AmqpValue) getWrappedMessage().getBody();
+
+ if (value.getValue() == null) {
+ result = null;
+ } else if (value.getValue() instanceof DescribedType) {
+ result = (DescribedType) value.getValue();
+ } else {
+ throw new NoSuchElementException("Message does not contain a DescribedType body");
+ }
+ }
+ }
+
+ return result;
+ }
+
//----- Internal implementation ------------------------------------------//
private void checkReadOnly() throws IllegalStateException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index b7ebeec..b934f95 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -398,7 +398,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
}
private void checkClosed() {
- if (isClosed()) {
+ if (isClosed() || connection.isClosed()) {
throw new IllegalStateException("Session is already closed");
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/13b915ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java
new file mode 100644
index 0000000..23b1c97
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDescribedTypePayloadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test that the broker can pass through an AMQP message with a described type
+ * in the message body regardless of transformer in use.
+ */
+@RunWith(Parameterized.class)
+public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
+
+ private final String transformer;
+
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"jms"},
+ {"native"},
+ {"raw"}
+ });
+ }
+
+ public AmqpDescribedTypePayloadTest(String transformer) {
+ this.transformer = transformer;
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return transformer;
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageWithDescribedTypeInBody() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setDescribedType(new AmqpNoLocalFilter());
+ sender.send(message);
+ sender.close();
+
+ QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getQueueSize());
+
+ AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ assertNotNull(received.getDescribedType());
+ receiver.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setDescribedType(new AmqpNoLocalFilter());
+ sender.send(message);
+ sender.close();
+ connection.close();
+
+ QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getQueueSize());
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+ Connection jmsConnection = factory.createConnection();
+ Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = jmsSession.createQueue(getTestName());
+ MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+ jmsConnection.start();
+
+ Message received = jmsConsumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof BytesMessage);
+ jmsConnection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testDescribedTypeMessageRoundTrips() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ // Send with AMQP client.
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setDescribedType(new AmqpNoLocalFilter());
+ sender.send(message);
+ sender.close();
+
+ QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getQueueSize());
+
+ // Receive and resend with OpenWire JMS client
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+ Connection jmsConnection = factory.createConnection();
+ Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = jmsSession.createQueue(getTestName());
+ MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+ jmsConnection.start();
+
+ Message received = jmsConsumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof BytesMessage);
+
+ MessageProducer jmsProducer = jmsSession.createProducer(destination);
+ jmsProducer.send(received);
+ jmsConnection.close();
+
+ assertEquals(1, queue.getQueueSize());
+
+ // Now lets receive it with AMQP and see that we get back what we expected.
+ AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ receiver.flow(1);
+ AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(returned);
+ assertNotNull(returned.getDescribedType());
+ receiver.close();
+ connection.close();
+ }
+}