You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/31 15:17:33 UTC
svn commit: r1404158 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/
main/java/org/apache/activemq/transport/amqp/transform/
test/java/org/apache/activemq/transport/amqp/joram/
Author: chirino
Date: Wed Oct 31 14:17:32 2012
New Revision: 1404158
URL: http://svn.apache.org/viewvc?rev=1404158&view=rev
Log:
Improved AMQP protocol support. All JORAM Selector tests are now working.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1404158&r1=1404157&r2=1404158&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct 31 14:17:32 2012
@@ -41,6 +41,7 @@ import org.fusesource.hawtbuf.ByteArrayO
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.InvalidSelectorException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
@@ -626,11 +627,13 @@ class AmqpProtocolConverter {
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
- receiver.open();
if (response.isException()) {
- // If the connection attempt fails we close the socket.
+ receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
+ ((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
receiver.close();
+ } else {
+ receiver.open();
}
pumpProtonToSocket();
}
@@ -899,11 +902,17 @@ class AmqpProtocolConverter {
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
- sender.open();
if (response.isException()) {
+ sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
- exception.printStackTrace();
+ String name = exception.getClass().getName();
+ if( exception instanceof InvalidSelectorException ) {
+ name = "amqp:invalid-field";
+ }
+ ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
sender.close();
+ } else {
+ sender.open();
}
pumpProtonToSocket();
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java?rev=1404158&r1=1404157&r2=1404158&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java Wed Oct 31 14:17:32 2012
@@ -138,7 +138,11 @@ public abstract class InboundTransformer
if( ma!=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
String key = entry.getKey().toString();
- setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+ if( "x-opt-jms-type".equals(key) ) {
+ jms.setJMSType(entry.getValue().toString());
+ } else {
+ setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
+ }
}
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java?rev=1404158&r1=1404157&r2=1404158&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java Wed Oct 31 14:17:32 2012
@@ -47,6 +47,9 @@ public class JoramJmsTest extends TestCa
TestSuite suite = new TestSuite();
// Passing tests
+ suite.addTestSuite(SelectorSyntaxTest.class);
+ suite.addTestSuite(QueueSessionTest.class);
+ suite.addTestSuite(SelectorTest.class);
suite.addTestSuite(TemporaryQueueTest.class);
suite.addTestSuite(ConnectionTest.class);
suite.addTestSuite(SessionTest.class);
@@ -58,10 +61,6 @@ public class JoramJmsTest extends TestCa
if (false ) {
-// TODO: Fails due to selectors not being implemented yet.
- suite.addTestSuite(SelectorSyntaxTest.class);
- suite.addTestSuite(SelectorTest.class);
- suite.addTestSuite(QueueSessionTest.class);
// TODO: Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl vs QueueImpl mapping issues
suite.addTestSuite(MessageHeaderTest.class);
// TODO: Fails due to JMS client setup browser before getEnumeration() gets called.