You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/10/24 15:32:32 UTC
svn commit: r1401674 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/
main/java/org/apache/activemq/transport/amqp/transform/
test/java/org/apache/activemq/transport/amqp/
Author: dejanb
Date: Wed Oct 24 13:32:32 2012
New Revision: 1401674
URL: http://svn.apache.org/viewvc?rev=1401674&view=rev
Log:
initial amqp selector support
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1401674&r1=1401673&r2=1401674&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct 24 13:32:32 2012
@@ -22,12 +22,16 @@ import org.apache.activemq.transport.amq
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.qpid.proton.codec.Decoder;
+import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.DescribedType;
+import org.apache.qpid.proton.type.Symbol;
import org.apache.qpid.proton.type.messaging.*;
import org.apache.qpid.proton.type.messaging.Modified;
import org.apache.qpid.proton.type.messaging.Rejected;
@@ -359,7 +363,8 @@ class AmqpProtocolConverter {
}
}
- InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ //InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+ InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
abstract class BaseProducerContext extends AmqpDeliveryListener {
@@ -776,7 +781,6 @@ class AmqpProtocolConverter {
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
-
// sender.get
ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
@@ -810,6 +814,11 @@ class AmqpProtocolConverter {
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
+ Map filter = ((org.apache.qpid.proton.type.messaging.Source)remoteSource).getFilter();
+ if (filter != null) {
+ DescribedType type = (DescribedType)filter.get(Symbol.valueOf("jms-selector"));
+ consumerInfo.setSelector(type.getDescribed().toString());
+ }
sendToActiveMQ(consumerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1401674&r1=1401673&r2=1401674&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Wed Oct 24 13:32:32 2012
@@ -182,7 +182,7 @@ public class JMSMappingInboundTransforme
}
final ApplicationProperties ap = amqp.getApplicationProperties();
- if( da!=null ) {
+ if( ap !=null ) {
for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, key, entry.getValue());
@@ -190,7 +190,7 @@ public class JMSMappingInboundTransforme
}
final Footer fp = amqp.getFooter();
- if( da!=null ) {
+ if( fp !=null ) {
for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
@@ -203,12 +203,13 @@ public class JMSMappingInboundTransforme
}
private void setProperty(Message msg, String key, Object value) throws JMSException {
+ //TODO support all types
if( value instanceof String ) {
msg.setStringProperty(key, (String) value);
-// } else if( value instanceof Integer ) {
-// msg.setIntProperty(key, ((Integer) value).intValue());
-// } else if( value instanceof Long ) {
-// msg.setLongProperty(key, ((Long) value).longValue());
+ } else if( value instanceof Integer ) {
+ msg.setIntProperty(key, ((Integer) value).intValue());
+ } else if( value instanceof Long ) {
+ msg.setLongProperty(key, ((Long) value).longValue());
} else {
throw new RuntimeException("Unexpected value type: "+value.getClass());
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1401674&r1=1401673&r2=1401674&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Wed Oct 24 13:32:32 2012
@@ -24,6 +24,7 @@ import org.junit.Test;
import javax.jms.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -39,13 +40,15 @@ public class JMSClientTest extends AmqpT
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
- p.send(session.createTextMessage("Hello World"));
+ Message msg = session.createTextMessage("Hello World");
+ msg.setObjectProperty("x", 1);
+ p.send(msg);
// session.commit();
-
- MessageConsumer c = session.createConsumer(queue);
- Message msg = c.receive();
- System.out.println("first:"+msg);
- System.out.println(msg.getJMSRedelivered());
+ MessageConsumer c = session.createConsumer(queue, "x = 1");
+ Message received = c.receive(2000);
+ assertNotNull(received);
+ System.out.println("first: " + ((TextMessage)received).getText());
+ System.out.println(received.getJMSRedelivered());
// session.rollback();
//