You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/03 23:46:47 UTC
svn commit: r1393782 - in /activemq/trunk: ./
activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/
activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/
activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/
Author: chirino
Date: Wed Oct 3 21:46:46 2012
New Revision: 1393782
URL: http://svn.apache.org/viewvc?rev=1393782&view=rev
Log:
Making more progress on the AMQP implementation.
Added:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
- copied, changed from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java
- copied, changed from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
- copied, changed from r1393500, activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
Removed:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
activemq/trunk/pom.xml
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java Wed Oct 3 21:46:46 2012
@@ -0,0 +1,67 @@
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.*;
+import org.apache.activemq.transport.amqp.transform.JMSVendor;
+
+import javax.jms.*;
+import javax.jms.Message;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ActiveMQJMSVendor extends JMSVendor {
+
+ final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
+
+ private ActiveMQJMSVendor() {}
+
+ @Override
+ public BytesMessage createBytesMessage() {
+ return new ActiveMQBytesMessage();
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() {
+ return new ActiveMQStreamMessage();
+ }
+
+ @Override
+ public Message createMessage() {
+ return new ActiveMQMessage();
+ }
+
+ @Override
+ public TextMessage createTextMessage() {
+ return new ActiveMQTextMessage();
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() {
+ return new ActiveMQObjectMessage();
+ }
+
+ @Override
+ public MapMessage createMapMessage() {
+ return new ActiveMQMapMessage();
+ }
+
+ @Override
+ public Destination createDestination(String name) {
+ return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
+ }
+
+ @Override
+ public void setJMSXUserID(Message msg, String value) {
+ ((ActiveMQMessage)msg).setUserID(value);
+ }
+
+ @Override
+ public void setJMSXGroupID(Message msg, String value) {
+ ((ActiveMQMessage)msg).setGroupID(value);
+ }
+
+ @Override
+ public void setJMSXGroupSequence(Message msg, int value) {
+ ((ActiveMQMessage)msg).setGroupSequence(value);
+ }
+}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct 3 21:46:46 2012
@@ -18,26 +18,23 @@ package org.apache.activemq.transport.am
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
-import org.apache.activemq.command.Message;
+import org.apache.activemq.transport.amqp.transform.*;
import org.apache.activemq.util.*;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.Inflater;
+
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
class AmqpProtocolConverter {
@@ -80,7 +77,7 @@ class AmqpProtocolConverter {
this.protonTransport.bind(this.protonConnection);
}
- void pumpOut() {
+ void pumpProtonToSocket() {
try {
int size = 1024 * 64;
byte data[] = new byte[size];
@@ -158,11 +155,7 @@ class AmqpProtocolConverter {
link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
while (link != null) {
- if (link instanceof Receiver) {
-// listener.onReceiverClose((Receiver) link);
- } else {
-// listener.onSenderClose((Sender) link);
- }
+ ((AmqpDeliveryListener)link.getContext()).onClose();
link.close();
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
@@ -170,8 +163,7 @@ class AmqpProtocolConverter {
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
//TODO - close links?
-// listener.onSessionClose(session);
- session.close();
+ onSessionClose(session);
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
@@ -183,7 +175,7 @@ class AmqpProtocolConverter {
handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
}
- pumpOut();
+ pumpProtonToSocket();
}
public void onActiveMQCommand(Command command) throws Exception {
@@ -223,6 +215,7 @@ class AmqpProtocolConverter {
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
+ public void onClose() throws Exception {}
}
private void onConnectionOpen() throws AmqpProtocolException {
@@ -255,14 +248,14 @@ class AmqpProtocolConverter {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.open();
- pumpOut();
+ pumpProtonToSocket();
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
// TODO: figure out how to close /w an error.
// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
protonConnection.close();
- pumpOut();
+ pumpProtonToSocket();
amqpTransport.onException(IOExceptionSupport.create(exception));
return;
}
@@ -278,6 +271,12 @@ class AmqpProtocolConverter {
session.open();
}
+ private void onSessionClose(Session session) {
+ AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
+ sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
+ session.close();
+ }
+
private void onLinkOpen(Link link) {
link.setLocalSourceAddress(link.getRemoteSourceAddress());
link.setLocalTargetAddress(link.getRemoteTargetAddress());
@@ -290,54 +289,54 @@ class AmqpProtocolConverter {
}
}
+ InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+
class ProducerContext extends AmqpDeliveryListener {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
+ ByteArrayOutputStream current = new ByteArrayOutputStream();
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
this.producerId = producerId;
this.destination = destination;
}
+
@Override
- public void onDelivery(Delivery delivery) throws JMSException {
-// delivery.
- ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
+ public void onDelivery(Delivery delivery) throws Exception {
+ if( current ==null ) {
+ current = new ByteArrayOutputStream();
+ }
+
+ Receiver receiver = ((Receiver)delivery.getLink());
+ int count;
+ byte data[] = new byte[1024*4];
+ while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+ current.write(data, 0, count);
+ }
+
+ // Expecting more deliveries..
+ if( count == 0 ) {
+ return;
+ }
+
+ final Buffer buffer = current.toBuffer();
+ final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
+ current = null;
+
+ if( message.getDestination()==null ) {
+ message.setJMSDestination(destination);
+ }
message.setProducerId(producerId);
+ if( message.getMessageId()==null ) {
+ message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
+ }
message.onSend();
// sendToActiveMQ(message, createResponseHandler(command));
sendToActiveMQ(message, null);
}
- ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
- ActiveMQBytesMessage msg = nextMessage(delivery);
- final Receiver receiver = (Receiver) delivery.getLink();
- byte buff[] = new byte[1024 * 4];
- int count = 0;
- while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
- msg.writeBytes(buff, 0, count);
- }
- return msg;
- }
-
- ActiveMQBytesMessage current;
-
- private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
- if (current == null) {
- current = new ActiveMQBytesMessage();
- current.setJMSDestination(destination);
- current.setProducerId(producerId);
- current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
- current.setTimestamp(System.currentTimeMillis());
- current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
-// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
-// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
- System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
- }
- return current;
- }
-
}
@@ -345,7 +344,7 @@ class AmqpProtocolConverter {
// Client is producing to this receiver object
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
- ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+ ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
ProducerContext producerContext = new ProducerContext(producerId, destination);
receiver.setContext(producerContext);
@@ -360,12 +359,13 @@ class AmqpProtocolConverter {
Throwable exception = ((ExceptionResponse) response).getException();
receiver.close();
}
- pumpOut();
+ pumpProtonToSocket();
}
});
}
+ OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
@@ -395,83 +395,66 @@ class AmqpProtocolConverter {
this.sender = sender;
}
- // called when the connection receives a JMS message from ActiveMQ
- public void onMessageDispatch(MessageDispatch md) throws Exception {
- final byte[] tag = nextTag();
- final Delivery delivery = sender.delivery(tag, 0, tag.length);
- delivery.setContext(md);
+ @Override
+ public void onClose() throws Exception {
+ sendToActiveMQ(new RemoveInfo(consumerId), null);
+ }
- // Covert to an AMQP messages.
- org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
- byte buffer[] = new byte[1024*4];
- int c=0;
+ LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
- // And send the AMQP message over the link.
- while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
- sender.send(buffer, 0, c);
- }
- sender.advance();
+ // called when the connection receives a JMS message from ActiveMQ
+ public void onMessageDispatch(MessageDispatch md) throws Exception {
+ outbound.addLast(md);
+ pumpOutbound();
+ pumpProtonToSocket();
+ }
+
+ Buffer current;
+
+ public void pumpOutbound() {
+ while(true) {
+
+ while( current!=null ) {
+ int sent = sender.send(current.data, current.offset, current.length);
+ if( sent > 0 ) {
+ current.moveHead(sent);
+ if( current.length == 0 ) {
+ sender.advance();
+ current = null;
+ }
+ } else {
+ return;
+ }
+ }
- }
+ if( outbound.isEmpty() ) {
+ return;
+ }
- public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
-// result.setContentEncoding();
-// QoS qoS;
-// if (message.propertyExists(QOS_PROPERTY_NAME)) {
-// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
-// qoS = QoS.values()[ordinal];
-//
-// } else {
-// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
-// }
-// result.qos(qoS);
+ final MessageDispatch md = outbound.removeFirst();
+ final byte[] tag = nextTag();
+ final Delivery delivery = sender.delivery(tag, 0, tag.length);
+ delivery.setContext(md);
- Buffer content = null;
- if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
- ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
- msg.setReadOnlyBody(true);
- String messageText = msg.getText();
- content = new Buffer(messageText.getBytes("UTF-8"));
- } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
- ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
- msg.setReadOnlyBody(true);
- byte[] data = new byte[(int) msg.getBodyLength()];
- msg.readBytes(data);
- content = new Buffer(data);
- } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
- ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
- msg.setReadOnlyBody(true);
- Map map = msg.getContentMap();
- content = new Buffer(map.toString().getBytes("UTF-8"));
- } else {
- ByteSequence byteSequence = message.getContent();
- if (byteSequence != null && byteSequence.getLength() > 0) {
- if (message.isCompressed()) {
- Inflater inflater = new Inflater();
- inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
- byte[] data = new byte[4096];
- int read;
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- while ((read = inflater.inflate(data)) != 0) {
- bytesOut.write(data, 0, read);
- }
- byteSequence = bytesOut.toByteSequence();
+ try {
+ final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+ final byte[] amqpMessage = outboundTransformer.transform(jms);
+ if( amqpMessage!=null && amqpMessage.length > 0 ) {
+ current = new Buffer(amqpMessage);
+ } else {
+ // TODO: message could not be generated what now?
}
- content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
- } else {
- content = new Buffer(0);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
-
- org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
- return result;
}
-
@Override
public void onDelivery(Delivery delivery) throws JMSException {
if( delivery.remotelySettled() ) {
MessageDispatch md = (MessageDispatch) delivery.getContext();
+ pumpOutbound();
}
}
@@ -501,38 +484,12 @@ class AmqpProtocolConverter {
Throwable exception = ((ExceptionResponse) response).getException();
sender.close();
}
- pumpOut();
+ pumpProtonToSocket();
}
});
}
-//
-// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
-// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
-// if (destination == null) {
-// throw new AmqpProtocolException("Invalid Destination.");
-// }
-//
-// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-// ConsumerInfo consumerInfo = new ConsumerInfo(id);
-// consumerInfo.setDestination(destination);
-// consumerInfo.setPrefetchSize(1000);
-// consumerInfo.setDispatchAsync(true);
-// if (!connect.cleanSession() && (connect.clientId() != null)) {
-// //by default subscribers are persistent
-// consumerInfo.setSubscriptionName(connect.clientId().toString());
-// }
-//
-// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
-//
-//
-// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
-//
-// sendToActiveMQ(consumerInfo, null);
-// return topic.qos();
-// }
-//
// void onUnSubscribe(UNSUBSCRIBE command) {
// UTF8Buffer[] topics = command.topics();
// if (topics != null) {
Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java&r1=1393500&r2=1393782&rev=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java Wed Oct 3 21:46:46 2012
@@ -14,11 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.amqp;
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQPNativeInboundTransformer extends InboundTransformer {
+
+ public AMQPNativeInboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ @Override
+ public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
+
+ BytesMessage rc = vendor.createBytesMessage();
+ rc.writeBytes(amqpMessage, offset, len);
+
+ rc.setJMSDeliveryMode(defaultDeliveryMode);
+ rc.setJMSPriority(defaultPriority);
+
+ final long now = System.currentTimeMillis();
+ rc.setJMSTimestamp(now);
+ if( defaultTtl > 0 ) {
+ rc.setJMSExpiration(now + defaultTtl);
+ }
+
+ rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
+ rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+ return rc;
+ }
}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java Wed Oct 3 21:46:46 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+ public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ @Override
+ public byte[] transform(Message jms) throws Exception {
+ if( jms == null )
+ return null;
+ if( !(jms instanceof BytesMessage) )
+ return null;
+
+ long messageFormat;
+ try {
+ if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
+ return null;
+ }
+ messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
+ } catch (MessageFormatException e) {
+ return null;
+ }
+
+ // TODO: Proton should probably expose a way to set the msg format
+ // delivery.settMessageFormat(messageFormat);
+
+ BytesMessage bytesMessage = (BytesMessage) jms;
+ byte data[] = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(data);
+ return data;
+ }
+
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Wed Oct 3 21:46:46 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+import javax.jms.Message;
+import java.io.IOException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class InboundTransformer {
+
+ JMSVendor vendor;
+ String prefixVendor = "JMS_AMQP_";
+ int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+ long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+ public InboundTransformer(JMSVendor vendor) {
+ this.vendor = vendor;
+ }
+
+ abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception;
+
+ public int getDefaultDeliveryMode() {
+ return defaultDeliveryMode;
+ }
+
+ public void setDefaultDeliveryMode(int defaultDeliveryMode) {
+ this.defaultDeliveryMode = defaultDeliveryMode;
+ }
+
+ public int getDefaultPriority() {
+ return defaultPriority;
+ }
+
+ public void setDefaultPriority(int defaultPriority) {
+ this.defaultPriority = defaultPriority;
+ }
+
+ public long getDefaultTtl() {
+ return defaultTtl;
+ }
+
+ public void setDefaultTtl(long defaultTtl) {
+ this.defaultTtl = defaultTtl;
+ }
+
+ public String getPrefixVendor() {
+ return prefixVendor;
+ }
+
+ public void setPrefixVendor(String prefixVendor) {
+ this.prefixVendor = prefixVendor;
+ }
+
+ public JMSVendor getVendor() {
+ return vendor;
+ }
+
+ public void setVendor(JMSVendor vendor) {
+ this.vendor = vendor;
+ }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Wed Oct 3 21:46:46 2012
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+ String prefixDeliveryAnnotations = "DA_";
+ String prefixMessageAnnotations= "MA_";
+ String prefixFooter = "FT_";
+
+ public JMSMappingInboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ @Override
+ public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
+ org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
+
+ while( len > 0 ) {
+ final int decoded = amqp.decode(amqpMessage, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+
+ Message rc;
+ final Section body = amqp.getBody();
+ if( body instanceof Data ) {
+ Binary d = ((Data) body).getValue();
+ BytesMessage m = vendor.createBytesMessage();
+ m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+ rc = m;
+ } else if (body instanceof AmqpSequence ) {
+ AmqpSequence sequence = (AmqpSequence) body;
+ StreamMessage m = vendor.createStreamMessage();
+ throw new RuntimeException("not implemented");
+// jms = m;
+ } else if (body instanceof AmqpValue) {
+ Object value = ((AmqpValue) body).getValue();
+ if( value == null ) {
+ rc = vendor.createMessage();
+ } if( value instanceof String ) {
+ TextMessage m = vendor.createTextMessage();
+ m.setText((String) value);
+ rc = m;
+ } else if( value instanceof Binary ) {
+ Binary d = (Binary) value;
+ BytesMessage m = vendor.createBytesMessage();
+ m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
+ rc = m;
+ } else if( value instanceof List) {
+ List d = (List) value;
+ StreamMessage m = vendor.createStreamMessage();
+ throw new RuntimeException("not implemented");
+// jms = m;
+ } else if( value instanceof Map) {
+ Map d = (Map) value;
+ MapMessage m = vendor.createMapMessage();
+ throw new RuntimeException("not implemented");
+// jms = m;
+ } else {
+ ObjectMessage m = vendor.createObjectMessage();
+ throw new RuntimeException("not implemented");
+// jms = m;
+ }
+ } else {
+ throw new RuntimeException("Unexpected body type.");
+ }
+ rc.setJMSDeliveryMode(defaultDeliveryMode);
+ rc.setJMSPriority(defaultPriority);
+ rc.setJMSExpiration(defaultTtl);
+
+ final Header header = amqp.getHeader();
+ if( header!=null ) {
+ if( header.getDurable()!=null ) {
+ rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+ if( header.getPriority()!=null ) {
+ rc.setJMSPriority(header.getPriority().intValue());
+ }
+ if( header.getTtl()!=null ) {
+ rc.setJMSExpiration(header.getTtl().longValue());
+ }
+ if( header.getFirstAcquirer() !=null ) {
+ rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
+ }
+ if( header.getDeliveryCount()!=null ) {
+ rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue());
+ }
+ }
+
+ final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
+ if( da!=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
+ }
+ }
+
+ final MessageAnnotations ma = amqp.getMessageAnnotations();
+ if( ma!=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+ }
+ }
+
+ final Properties properties = amqp.getProperties();
+ if( properties!=null ) {
+ if( properties.getMessageId()!=null ) {
+ rc.setJMSMessageID(properties.getMessageId().toString());
+ }
+ Binary userId = properties.getUserId();
+ if( userId!=null ) {
+ vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
+ }
+ if( properties.getTo()!=null ) {
+ rc.setJMSDestination(vendor.createDestination(properties.getTo()));
+ }
+ if( properties.getSubject()!=null ) {
+ rc.setStringProperty(prefixVendor + "Subject", properties.getSubject());
+ }
+ if( properties.getReplyTo() !=null ) {
+ rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
+ }
+ if( properties.getCorrelationId() !=null ) {
+ rc.setJMSCorrelationID(properties.getCorrelationId().toString());
+ }
+ if( properties.getContentType() !=null ) {
+ rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
+ }
+ if( properties.getContentEncoding() !=null ) {
+ rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
+ }
+ if( properties.getCreationTime()!=null ) {
+ rc.setJMSTimestamp(properties.getCreationTime().getTime());
+ }
+ if( properties.getGroupId()!=null ) {
+ vendor.setJMSXGroupID(rc, properties.getGroupId());
+ }
+ if( properties.getGroupSequence()!=null ) {
+ vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue());
+ }
+ if( properties.getReplyToGroupId()!=null ) {
+ rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
+ }
+ }
+
+ final ApplicationProperties ap = amqp.getApplicationProperties();
+ if( da!=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(rc, key, entry.getValue());
+ }
+ }
+
+ final Footer fp = amqp.getFooter();
+ if( da!=null ) {
+ for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
+ }
+ }
+
+ rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
+ rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+ return rc;
+ }
+
+ private void setProperty(Message msg, String key, Object value) throws JMSException {
+ if( value instanceof String ) {
+ msg.setStringProperty(key, (String) value);
+// } else if( value instanceof Integer ) {
+// msg.setIntProperty(key, ((Integer) value).intValue());
+// } else if( value instanceof Long ) {
+// msg.setLongProperty(key, ((Long) value).longValue());
+ } else {
+ throw new RuntimeException("Unexpected value type: "+value.getClass());
+ }
+ }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java Wed Oct 3 21:46:46 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class JMSMappingOutboundTransformer extends OutboundTransformer {
+
+
+ public JMSMappingOutboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ @Override
+ public byte[] transform(Message jms) throws Exception {
+ if( jms == null )
+ return null;
+ if( !(jms instanceof BytesMessage) )
+ return null;
+
+ long messageFormat;
+ try {
+ if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
+ return null;
+ }
+ messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
+ } catch (MessageFormatException e) {
+ return null;
+ }
+
+ // TODO: Proton should probably expose a way to set the msg format
+ // delivery.settMessageFormat(messageFormat);
+
+ BytesMessage bytesMessage = (BytesMessage) jms;
+ byte data[] = new byte[(int) bytesMessage.getBodyLength()];
+ bytesMessage.readBytes(data);
+ return data;
+ }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java?rev=1393782&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java Wed Oct 3 21:46:46 2012
@@ -0,0 +1,29 @@
+package org.apache.activemq.transport.amqp.transform;
+
+import javax.jms.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class JMSVendor {
+
+ public abstract BytesMessage createBytesMessage();
+
+ public abstract StreamMessage createStreamMessage();
+
+ public abstract Message createMessage();
+
+ public abstract TextMessage createTextMessage();
+
+ public abstract ObjectMessage createObjectMessage();
+
+ public abstract MapMessage createMapMessage();
+
+ public abstract void setJMSXUserID(Message jms, String value);
+
+ public abstract Destination createDestination(String name);
+
+ public abstract void setJMSXGroupID(Message jms, String groupId);
+
+ public abstract void setJMSXGroupSequence(Message jms, int i);
+}
Copied: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java (from r1393500, activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java?p2=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java&p1=activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java&r1=1393500&r2=1393782&rev=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java Wed Oct 3 21:46:46 2012
@@ -14,16 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.amqp;
+package org.apache.activemq.transport.amqp.transform;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+import javax.jms.Message;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class InboundTransformer {
+public abstract class OutboundTransformer {
+ JMSVendor vendor;
String prefixVendor = "JMS_AMQP_";
- int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
- int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
- long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+ public OutboundTransformer(JMSVendor vendor) {
+ this.vendor = vendor;
+ }
+
+ public abstract byte[] transform(Message jms) throws Exception;
+
+ public String getPrefixVendor() {
+ return prefixVendor;
+ }
+
+ public void setPrefixVendor(String prefixVendor) {
+ this.prefixVendor = prefixVendor;
+ }
+
+ public JMSVendor getVendor() {
+ return vendor;
+ }
+
+ public void setVendor(JMSVendor vendor) {
+ this.vendor = vendor;
+ }
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java Wed Oct 3 21:46:46 2012
@@ -18,7 +18,7 @@ package org.apache.activemq.transport.am
import org.apache.activemq.broker.BrokerService;
-public class AmqpNioTest extends AmqpTest {
+public class AmqpNioTest extends AmqpTestSupport {
protected void addAMQPConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1");
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java Wed Oct 3 21:46:46 2012
@@ -28,7 +28,7 @@ import java.security.cert.CertificateExc
import java.security.cert.X509Certificate;
@Ignore("hangs atm, needs investigation")
-public class AmqpSslTest extends AmqpTest {
+public class AmqpSslTest extends AmqpTestSupport {
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
Copied: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (from r1393500, activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?p2=activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java&p1=activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java&r1=1393500&r2=1393782&rev=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Wed Oct 3 21:46:46 2012
@@ -16,34 +16,25 @@
*/
package org.apache.activemq.transport.amqp;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import junit.framework.TestCase;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-public class AmqpTest {
- protected static final Logger LOG = LoggerFactory.getLogger(AmqpTest.class);
+public class AmqpTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages;
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java Wed Oct 3 21:46:46 2012
@@ -27,7 +27,7 @@ import org.junit.Test;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class SwiftMQClientTest extends AmqpTest {
+public class SwiftMQClientTest extends AmqpTestSupport {
@Test
public void testSendReceive() throws Exception {
@@ -64,6 +64,7 @@ public class SwiftMQClientTest extends A
p.close();
session.close();
}
+
// {
// Session session = connection.createSession(10, 10);
// Consumer c = session.createConsumer(queue, 100, qos, true, null);
Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1393782&r1=1393781&r2=1393782&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Wed Oct 3 21:46:46 2012
@@ -1373,6 +1373,12 @@
<profiles>
<profile>
+ <id>unstable</id>
+ <modules>
+ <module>activemq-amqp</module>
+ </modules>
+ </profile>
+ <profile>
<id>apache-release</id>
<activation>
<property>