You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/29 00:16:44 UTC
svn commit: r1414990 - in /activemq/trunk: ./ activemq-amqp/
activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/
activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/
activemq-amqp/src/test/java/org/apache/activemq/trans...
Author: chirino
Date: Wed Nov 28 23:16:42 2012
New Revision: 1414990
URL: http://svn.apache.org/viewvc?rev=1414990&view=rev
Log:
Moved the JMS mapping logic into a proton module.
Removed:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java
Modified:
activemq/trunk/activemq-amqp/pom.xml
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
activemq/trunk/assembly/src/main/descriptors/common-bin.xml
activemq/trunk/pom.xml
Modified: activemq/trunk/activemq-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/pom.xml?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/pom.xml (original)
+++ activemq/trunk/activemq-amqp/pom.xml Wed Nov 28 23:16:42 2012
@@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>proton</artifactId>
+ <artifactId>proton-jms</artifactId>
<version>${qpid-proton-version}</version>
</dependency>
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java Wed Nov 28 23:16:42 2012
@@ -1,7 +1,7 @@
package org.apache.activemq.transport.amqp;
import org.apache.activemq.command.*;
-import org.apache.activemq.transport.amqp.transform.JMSVendor;
+import org.apache.qpid.proton.jms.JMSVendor;
import javax.jms.*;
import javax.jms.Message;
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Nov 28 23:16:42 2012
@@ -18,16 +18,18 @@ package org.apache.activemq.transport.am
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*;
-import org.apache.activemq.transport.amqp.transform.*;
+import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.*;
import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.jms.*;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.DescribedType;
import org.apache.qpid.proton.type.Symbol;
+import org.apache.qpid.proton.type.UnsignedInteger;
import org.apache.qpid.proton.type.messaging.*;
import org.apache.qpid.proton.type.messaging.Modified;
import org.apache.qpid.proton.type.messaging.Rejected;
@@ -60,6 +62,8 @@ class AmqpProtocolConverter {
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
+ private static final UnsignedInteger DURABLE = new UnsignedInteger(2);
+ private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
this.amqpTransport = amqpTransport;
@@ -86,7 +90,7 @@ class AmqpProtocolConverter {
// private String clientId;
// private final String QOS_PROPERTY_NAME = "QoSPropertyName";
int prefetch = 100;
- boolean trace = false;
+ boolean trace = true;
TransportImpl protonTransport = new TransportImpl();
ConnectionImpl protonConnection = new ConnectionImpl();
@@ -345,8 +349,6 @@ class AmqpProtocolConverter {
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
- } else {
- connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
}
@@ -679,6 +681,7 @@ class AmqpProtocolConverter {
private final ConsumerId consumerId;
private final Sender sender;
private boolean presettle;
+ private boolean closed;
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
@@ -714,23 +717,28 @@ class AmqpProtocolConverter {
@Override
public void onClose() throws Exception {
- sendToActiveMQ(new RemoveInfo(consumerId), null);
+ if( !closed ) {
+ closed = true;
+ sendToActiveMQ(new RemoveInfo(consumerId), null);
+ }
}
LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
- outbound.addLast(md);
- pumpOutbound();
- pumpProtonToSocket();
+ if( !closed ) {
+ outbound.addLast(md);
+ pumpOutbound();
+ pumpProtonToSocket();
+ }
}
Buffer currentBuffer;
Delivery currentDelivery;
public void pumpOutbound() throws Exception {
- while(true) {
+ while(!closed) {
while( currentBuffer !=null ) {
int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
@@ -876,41 +884,94 @@ class AmqpProtocolConverter {
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
- // sender.get
- ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
+ org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
+
+ final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
+ sender.setContext(consumerContext);
- subscriptionsByConsumerId.put(id, consumerContext);
+ String selector = null;
+ if( source!=null ) {
+ Map filter = source.getFilter();
+ if (filter != null) {
+ DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
+ if( value!=null ) {
+ selector = value.getDescribed().toString();
+ // Validate the Selector.
+ try {
+ SelectorParser.parse(selector);
+ } catch (InvalidSelectorException e) {
+ sender.setSource(null);
+ ((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
+ sender.close();
+ consumerContext.closed = true;
+ return;
+ }
+ }
+ }
+ }
ActiveMQDestination dest;
- org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
- if( source != null && !source.getDynamic() ) {
- dest = createDestination(source);
- } else {
+ if( source == null ) {
+
+ source = new org.apache.qpid.proton.type.messaging.Source();
+ source.setAddress("");
+ source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
+ sender.setSource(source);
+
+ // Looks like durable sub removal.
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(connectionId);
+ rsi.setSubscriptionName(sender.getName());
+ rsi.setClientId(connectionInfo.getClientId());
+
+ consumerContext.closed=true;
+ sendToActiveMQ(rsi, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ if (response.isException()) {
+ sender.setSource(null);
+ Throwable exception = ((ExceptionResponse) response).getException();
+ String name = exception.getClass().getName();
+ ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+ }
+ sender.open();
+ pumpProtonToSocket();
+ }
+ });
+ return;
+ } else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
+ consumerContext.closed=true;
+ sender.close();
+ pumpProtonToSocket();
+ return;
+ } else if( source.getDynamic() ) {
// lets create a temp dest.
dest = createTempQueue();
source = new org.apache.qpid.proton.type.messaging.Source();
source.setAddress(dest.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
+ } else {
+ dest = createDestination(source);
}
- sender.setContext(consumerContext);
+ subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
+ consumerInfo.setSelector(selector);
+ consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
if( source.getDistributionMode() == COPY && dest.isQueue() ) {
consumerInfo.setBrowser(true);
}
+ if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
+ consumerInfo.setSubscriptionName(sender.getName());
+ }
Map filter = source.getFilter();
if (filter != null) {
- DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
- if( value!=null ) {
- consumerInfo.setSelector(value.getDescribed().toString());
- }
- value = (DescribedType)filter.get(NO_LOCAL);
+ DescribedType value = (DescribedType)filter.get(NO_LOCAL);
if( value!=null ) {
consumerInfo.setNoLocal(true);
}
@@ -926,6 +987,7 @@ class AmqpProtocolConverter {
name = "amqp:invalid-field";
}
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+ subscriptionsByConsumerId.remove(id);
sender.close();
} else {
sender.open();
@@ -936,6 +998,17 @@ class AmqpProtocolConverter {
}
+ static private boolean contains(Symbol[] haystack, Symbol needle) {
+ if( haystack!=null ) {
+ for (Symbol capability : haystack) {
+ if( capability == needle) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
private ActiveMQDestination createTempQueue() {
ActiveMQDestination rc;
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Wed Nov 28 23:16:42 2012
@@ -21,7 +21,7 @@ import org.apache.activemq.command.Comma
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.amqp.transform.InboundTransformer;
+import org.apache.qpid.proton.jms.InboundTransformer;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Wed Nov 28 23:16:42 2012
@@ -20,6 +20,7 @@ import org.apache.activemq.transport.amq
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Test;
+import org.objectweb.jtests.jms.framework.TestConfig;
import javax.jms.*;
@@ -27,6 +28,7 @@ import java.util.Enumeration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -42,30 +44,21 @@ public class JMSClientTest extends AmqpT
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
- Message msg = session.createTextMessage("Hello World");
- msg.setObjectProperty("x", 1);
- p.send(msg);
-// session.commit();
-/* MessageConsumer c = session.createConsumer(queue, "x = 1");
- Message received = c.receive(2000);
- assertNotNull(received);
- System.out.println("first: " + ((TextMessage)received).getText());
- System.out.println(received.getJMSRedelivered());*/
+ TextMessage message = session.createTextMessage();
+ message.setText("hello");
+ p.send(message);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
-
- System.out.println("BROWSE " + enumeration.nextElement());
+ Message m = (Message) enumeration.nextElement();
+ assertTrue(m instanceof TextMessage);
}
-
-// session.rollback();
-//
-// msg = c.receive();
-// System.out.println("second:"+msg);
-// System.out.println(msg.getJMSRedelivered());
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(TestConfig.TIMEOUT);
+ assertTrue(message instanceof TextMessage);
}
connection.close();
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java Wed Nov 28 23:16:42 2012
@@ -52,19 +52,21 @@ public class JoramJmsTest extends TestCa
TestSuite suite = new TestSuite();
// TODO: Fix these tests..
- if (false) {
- // Fails due to durable subs not being implemented.
- suite.addTestSuite(TopicSessionTest.class);
- // Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl vs QueueImpl mapping issues
+ if (true) {
+ // Fails due to https://issues.apache.org/jira/browse/QPID-4454
suite.addTestSuite(MessageHeaderTest.class);
- // Fails due to inconsistent Message mapping in the JMS client.
- suite.addTestSuite(MessageTypeTest.class);
+ // Fails due to https://issues.apache.org/jira/browse/QPID-4455
suite.addTestSuite(QueueBrowserTest.class);
+ }
+ // TODO: enable once QPID 0.21 is released
+ if(true) {
+ suite.addTestSuite(MessageTypeTest.class);
}
+ suite.addTestSuite(TopicSessionTest.class);
// TODO: enable once QPID 0.19 is released
- if(false) {
+ if(true) {
suite.addTestSuite(UnifiedSessionTest.class);
suite.addTestSuite(TemporaryTopicTest.class);
suite.addTestSuite(TopicConnectionTest.class);
Modified: activemq/trunk/assembly/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/main/descriptors/common-bin.xml?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/assembly/src/main/descriptors/common-bin.xml (original)
+++ activemq/trunk/assembly/src/main/descriptors/common-bin.xml Wed Nov 28 23:16:42 2012
@@ -183,7 +183,8 @@
<include>org.jasypt:jasypt</include>
<include>org.jasypt:jasypt-spring3</include>
<include>javax.jmdns:jmdns</include>
- <include>org.apache.qpid:qpid-proton</include>
+ <include>org.apache.qpid:proton</include>
+ <include>org.apache.qpid:proton-jms</include>
</includes>
</dependencySet>
<dependencySet>
Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Wed Nov 28 23:16:42 2012
@@ -92,7 +92,7 @@
<org.osgi.core-version>4.2.0</org.osgi.core-version>
<p2psockets-version>1.1.2</p2psockets-version>
<qpid-proton-version>1.0-SNAPSHOT</qpid-proton-version>
- <qpid-jms-version>0.18</qpid-jms-version>
+ <qpid-jms-version>0.21-SNAPSHOT</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.4</saxon-version>