You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/03 16:15:03 UTC
svn commit: r1393500 [1/2] - in /activemq/trunk/activemq-amqp: ./ src/
src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/activemq/
src/main/java/org/apache/activemq/transport/
src/main/java/org/apache/active...
Author: chirino
Date: Wed Oct 3 14:15:01 2012
New Revision: 1393500
URL: http://svn.apache.org/viewvc?rev=1393500&view=rev
Log:
Initial rough cut of AMQP protocol support using the QPID proton project.
Added:
activemq/trunk/activemq-amqp/
activemq/trunk/activemq-amqp/pom.xml
activemq/trunk/activemq-amqp/src/
activemq/trunk/activemq-amqp/src/main/
activemq/trunk/activemq-amqp/src/main/java/
activemq/trunk/activemq-amqp/src/main/java/org/
activemq/trunk/activemq-amqp/src/main/java/org/apache/
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html
activemq/trunk/activemq-amqp/src/main/resources/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp
activemq/trunk/activemq-amqp/src/test/
activemq/trunk/activemq-amqp/src/test/java/
activemq/trunk/activemq-amqp/src/test/java/org/
activemq/trunk/activemq-amqp/src/test/java/org/apache/
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
activemq/trunk/activemq-amqp/src/test/resources/
activemq/trunk/activemq-amqp/src/test/resources/log4j.properties (with props)
Added: activemq/trunk/activemq-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/pom.xml?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/pom.xml (added)
+++ activemq/trunk/activemq-amqp/pom.xml Wed Oct 3 14:15:01 2012
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>5.8-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>activemq-amqp</artifactId>
+ <packaging>jar</packaging>
+
+ <name>ActiveMQ :: AMQP</name>
+ <description>ActiveMQ implementaiton of AMQP messaging protocol</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-proton</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-console</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <!-- profile which is activated is the swiftmq-client-home prop is defined.
+ Tt tests the amqp broker impl using the swiftmq client libs -->
+ <id>swiftmq-client</id>
+ <activation>
+ <property><name>swiftmq-client-home</name></property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>com.swiftmq</groupId>
+ <artifactId>swiftmq</artifactId>
+ <version>9.2.0</version>
+ <scope>system</scope>
+ <systemPath>${swiftmq-client-home}/jars/swiftmq.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>com.swiftmq</groupId>
+ <artifactId>swiftmq-amqp</artifactId>
+ <version>9.2.0</version>
+ <scope>system</scope>
+ <systemPath>${swiftmq-client-home}/jars/amqp.jar</systemPath>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${basedir}/src/test-swiftmq/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AMQPNativeInboundTransformer extends InboundTransformer {
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
+ */
+public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
+
+ private BrokerContext brokerContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ @SuppressWarnings("rawtypes")
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new AmqpTransportFilter(transport, format, brokerContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
+ }
+
+// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//
+// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+// filter.setInactivityMonitor(monitor);
+//
+// return monitor;
+// }
+
+
+ @Override
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+public class AmqpNioSslTransport extends NIOSSLTransport {
+
+ public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket);
+ }
+
+ @Override
+ protected void initializeStreams() throws IOException {
+ super.initializeStreams();
+ if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
+ serviceRead();
+ }
+ }
+
+ @Override
+ protected void processCommand(ByteBuffer plain) throws Exception {
+ doConsume(AmqpSupport.toBuffer(plain));
+ }
+
+}
\ No newline at end of file
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
+
+ SSLContext context;
+
+ @Override
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket);
+ if (context != null) {
+ transport.setSslContext(context);
+ }
+ return transport;
+ }
+ };
+ }
+
+ @Override
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new AmqpNioSslTransport(wf, socketFactory, location, localLocation);
+ }
+
+ @Override
+ public TransportServer doBind(URI location) throws IOException {
+ if (SslContext.getCurrentSslContext() != null) {
+ try {
+ context = SslContext.getCurrentSslContext().getSSLContext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return super.doBind(location);
+ }
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+import javax.net.SocketFactory;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
+ */
+public class AmqpNioTransport extends TcpTransport {
+
+ private SocketChannel channel;
+ private SelectorSelection selection;
+
+ private ByteBuffer inputBuffer;
+
+ public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket);
+ }
+
+ protected void initializeStreams() throws IOException {
+ channel = socket.getChannel();
+ channel.configureBlocking(false);
+ // listen for events telling us when the socket is readable.
+ selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+ public void onSelect(SelectorSelection selection) {
+ if (!isStopped()) {
+ serviceRead();
+ }
+ }
+
+ public void onError(SelectorSelection selection, Throwable error) {
+ if (error instanceof IOException) {
+ onException((IOException) error);
+ } else {
+ onException(IOExceptionSupport.create(error));
+ }
+ }
+ });
+
+ inputBuffer = ByteBuffer.allocate(8 * 1024);
+ NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
+ this.dataOut = new DataOutputStream(outPutStream);
+ this.buffOut = outPutStream;
+ }
+
+ private void serviceRead() {
+ try {
+
+ while (isStarted()) {
+ // read channel
+ int readSize = channel.read(inputBuffer);
+ // channel is closed, cleanup
+ if (readSize == -1) {
+ onException(new EOFException());
+ selection.close();
+ break;
+ }
+ // nothing more to read, break
+ if (readSize == 0) {
+ break;
+ }
+
+ inputBuffer.flip();
+ doConsume(AmqpSupport.toBuffer(inputBuffer));
+ // clear the buffer
+ inputBuffer.clear();
+
+ }
+ } catch (IOException e) {
+ onException(e);
+ } catch (Throwable e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ protected void doStart() throws Exception {
+ connect();
+ selection.setInterestOps(SelectionKey.OP_READ);
+ selection.enable();
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ try {
+ if (selection != null) {
+ selection.close();
+ }
+ } finally {
+ super.doStop(stopper);
+ }
+ }
+}
\ No newline at end of file
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> over NIO transport factory
+ */
+public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
+
+ private BrokerContext brokerContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ return new AmqpNioTransport(format, socket);
+ }
+ };
+ }
+
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new AmqpNioTransport(wf, socketFactory, location, localLocation);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new AmqpTransportFilter(transport, format, brokerContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
+ }
+
+// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+// filter.setInactivityMonitor(monitor);
+// return monitor;
+// }
+
+ @Override
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+}
+
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,750 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.command.*;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.util.*;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.Inflater;
+
+class AmqpProtocolConverter {
+
+ public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
+ public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
+ public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
+ public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
+
+ private final AmqpTransport amqpTransport;
+
+ public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
+ this.amqpTransport = amqpTransport;
+ }
+
+//
+// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
+//
+//
+// private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+// private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+//
+// private final ConcurrentHashMap<ConsumerId, AmqpSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSubscription>();
+// private final ConcurrentHashMap<UTF8Buffer, AmqpSubscription> amqpSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, AmqpSubscription>();
+// private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
+// private final Map<Destination, UTF8Buffer> amqpTopicMap = new LRUCache<Destination, UTF8Buffer>();
+// private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
+// private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
+//
+// private final AtomicBoolean connected = new AtomicBoolean(false);
+// private CONNECT connect;
+// private String clientId;
+// private final String QOS_PROPERTY_NAME = "QoSPropertyName";
+
+
+ TransportImpl protonTransport = new TransportImpl();
+ ConnectionImpl protonConnection = new ConnectionImpl();
+
+ {
+ this.protonTransport.bind(this.protonConnection);
+ }
+
+ void pumpOut() {
+ try {
+ int size = 1024 * 64;
+ byte data[] = new byte[size];
+ boolean done = false;
+ while (!done) {
+ int count = protonTransport.output(data, 0, size);
+ if (count > 0) {
+ final Buffer buffer = new Buffer(data, 0, count);
+ System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
+ amqpTransport.sendToAmqp(buffer);
+ } else {
+ done = true;
+ }
+ }
+// System.out.println("write done");
+ } catch (IOException e) {
+ amqpTransport.onException(e);
+ }
+ }
+
+ static class AmqpSessionContext {
+ private final SessionId sessionId;
+ long nextProducerId = 0;
+ long nextConsumerId = 0;
+
+ public AmqpSessionContext(ConnectionId connectionId, long id) {
+ sessionId = new SessionId(connectionId, -1);
+
+ }
+ }
+
+ /**
+ * Convert a AMQP command
+ */
+ public void onAMQPData(Buffer frame) throws IOException, JMSException {
+
+
+ try {
+ System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+ protonTransport.input(frame.data, frame.offset, frame.length);
+ } catch (Throwable e) {
+ handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+ }
+
+ try {
+
+ // Handle the amqp open..
+ if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
+ onConnectionOpen();
+ }
+
+ // Lets map amqp sessions to openwire sessions..
+ Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while (session != null) {
+
+ onSessionOpen(session);
+ session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
+
+
+ Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while (link != null) {
+ onLinkOpen(link);
+ link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
+
+ Delivery delivery = protonConnection.getWorkHead();
+ while (delivery != null) {
+ AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+ if (listener != null) {
+ listener.onDelivery(delivery);
+ }
+ delivery = delivery.getWorkNext();
+ }
+
+ link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+ while (link != null) {
+ if (link instanceof Receiver) {
+// listener.onReceiverClose((Receiver) link);
+ } else {
+// listener.onSenderClose((Sender) link);
+ }
+ link.close();
+ link = link.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+
+ session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+ while (session != null) {
+ //TODO - close links?
+// listener.onSessionClose(session);
+ session.close();
+ session = session.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+ if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
+// listener.onConnectionClose(protonConnection);
+ protonConnection.close();
+ }
+
+ } catch (Throwable e) {
+ handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+ }
+
+ pumpOut();
+ }
+
+ public void onActiveMQCommand(Command command) throws Exception {
+ if (command.isResponse()) {
+ Response response = (Response) command;
+ ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+ if (rh != null) {
+ rh.onResponse(this, response);
+ } else {
+ // Pass down any unexpected errors. Should this close the connection?
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ handleException(exception);
+ }
+ }
+ } else if (command.isMessageDispatch()) {
+ MessageDispatch md = (MessageDispatch) command;
+ ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
+ if (consumerContext != null) {
+ consumerContext.onMessageDispatch(md);
+ }
+ } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+ // Pass down any unexpected async errors. Should this close the connection?
+ Throwable exception = ((ConnectionError) command).getException();
+ handleException(exception);
+ } else if (command.isBrokerInfo()) {
+ //ignore
+ } else {
+ LOG.debug("Do not know how to process ActiveMQ Command " + command);
+ }
+ }
+
+ private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+ private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
+ private ConnectionInfo connectionInfo = new ConnectionInfo();
+ private long nextSessionId = 0;
+
+ static abstract class AmqpDeliveryListener {
+ abstract public void onDelivery(Delivery delivery) throws Exception;
+ }
+
+ private void onConnectionOpen() throws AmqpProtocolException {
+ connectionInfo.setResponseRequired(true);
+ connectionInfo.setConnectionId(connectionId);
+// configureInactivityMonitor(connect.keepAlive());
+
+ String clientId = protonConnection.getRemoteContainer();
+ if (clientId != null && !clientId.isEmpty()) {
+ connectionInfo.setClientId(clientId);
+ } else {
+ connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
+ }
+
+
+// String userName = "";
+// if (connect.userName() != null) {
+// userName = connect.userName().toString();
+// }
+// String passswd = "";
+// if (connect.password() != null) {
+// passswd = connect.password().toString();
+// }
+// connectionInfo.setUserName(userName);
+// connectionInfo.setPassword(passswd);
+
+ connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
+
+ sendToActiveMQ(connectionInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+
+ protonConnection.open();
+ pumpOut();
+
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+// TODO: figure out how to close /w an error.
+// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
+ protonConnection.close();
+ pumpOut();
+ amqpTransport.onException(IOExceptionSupport.create(exception));
+ return;
+ }
+
+ }
+ });
+ }
+
+ private void onSessionOpen(Session session) {
+ AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
+ session.setContext(sessionContext);
+ sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
+ session.open();
+ }
+
+ private void onLinkOpen(Link link) {
+ link.setLocalSourceAddress(link.getRemoteSourceAddress());
+ link.setLocalTargetAddress(link.getRemoteTargetAddress());
+
+ AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
+ if (link instanceof Receiver) {
+ onReceiverOpen((Receiver) link, sessionContext);
+ } else {
+ onSenderOpen((Sender) link, sessionContext);
+ }
+ }
+
+ class ProducerContext extends AmqpDeliveryListener {
+ private final ProducerId producerId;
+ private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+ private final ActiveMQDestination destination;
+
+ public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+ this.producerId = producerId;
+ this.destination = destination;
+ }
+
+ @Override
+ public void onDelivery(Delivery delivery) throws JMSException {
+// delivery.
+ ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
+ message.setProducerId(producerId);
+ message.onSend();
+// sendToActiveMQ(message, createResponseHandler(command));
+ sendToActiveMQ(message, null);
+ }
+
+ ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
+ ActiveMQBytesMessage msg = nextMessage(delivery);
+ final Receiver receiver = (Receiver) delivery.getLink();
+ byte buff[] = new byte[1024 * 4];
+ int count = 0;
+ while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
+ msg.writeBytes(buff, 0, count);
+ }
+ return msg;
+ }
+
+ ActiveMQBytesMessage current;
+
+ private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
+ if (current == null) {
+ current = new ActiveMQBytesMessage();
+ current.setJMSDestination(destination);
+ current.setProducerId(producerId);
+ current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
+ current.setTimestamp(System.currentTimeMillis());
+ current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+// msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
+// msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
+ System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
+ }
+ return current;
+ }
+
+ }
+
+
+ void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
+ // Client is producing to this receiver object
+
+ ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+ ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+ ProducerContext producerContext = new ProducerContext(producerId, destination);
+
+ receiver.setContext(producerContext);
+ receiver.flow(1024 * 64);
+ ProducerInfo producerInfo = new ProducerInfo(producerId);
+ producerInfo.setDestination(destination);
+ sendToActiveMQ(producerInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ receiver.open();
+ if (response.isException()) {
+ // If the connection attempt fails we close the socket.
+ Throwable exception = ((ExceptionResponse) response).getException();
+ receiver.close();
+ }
+ pumpOut();
+ }
+ });
+
+ }
+
+
+ class ConsumerContext extends AmqpDeliveryListener {
+ private final ConsumerId consumerId;
+ private final Sender sender;
+
+ long nextTagId = 0;
+ HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+ byte[] nextTag() {
+ byte[] rc;
+ if (tagCache != null && !tagCache.isEmpty()) {
+ final Iterator<byte[]> iterator = tagCache.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ try {
+ rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return rc;
+ }
+
+ public ConsumerContext(ConsumerId consumerId, Sender sender) {
+ this.consumerId = consumerId;
+ this.sender = sender;
+ }
+
+ // called when the connection receives a JMS message from ActiveMQ
+ public void onMessageDispatch(MessageDispatch md) throws Exception {
+ final byte[] tag = nextTag();
+ final Delivery delivery = sender.delivery(tag, 0, tag.length);
+ delivery.setContext(md);
+
+ // Covert to an AMQP messages.
+ org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
+ byte buffer[] = new byte[1024*4];
+ int c=0;
+
+ // And send the AMQP message over the link.
+ while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
+ sender.send(buffer, 0, c);
+ }
+ sender.advance();
+
+ }
+
+ public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
+// result.setContentEncoding();
+// QoS qoS;
+// if (message.propertyExists(QOS_PROPERTY_NAME)) {
+// int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
+// qoS = QoS.values()[ordinal];
+//
+// } else {
+// qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
+// }
+// result.qos(qoS);
+
+ Buffer content = null;
+ if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+ ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ String messageText = msg.getText();
+ content = new Buffer(messageText.getBytes("UTF-8"));
+ } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+ ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ byte[] data = new byte[(int) msg.getBodyLength()];
+ msg.readBytes(data);
+ content = new Buffer(data);
+ } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+ ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ Map map = msg.getContentMap();
+ content = new Buffer(map.toString().getBytes("UTF-8"));
+ } else {
+ ByteSequence byteSequence = message.getContent();
+ if (byteSequence != null && byteSequence.getLength() > 0) {
+ if (message.isCompressed()) {
+ Inflater inflater = new Inflater();
+ inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
+ byte[] data = new byte[4096];
+ int read;
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ while ((read = inflater.inflate(data)) != 0) {
+ bytesOut.write(data, 0, read);
+ }
+ byteSequence = bytesOut.toByteSequence();
+ }
+ content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
+ } else {
+ content = new Buffer(0);
+ }
+ }
+
+ org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
+ return result;
+ }
+
+
+ @Override
+ public void onDelivery(Delivery delivery) throws JMSException {
+ if( delivery.remotelySettled() ) {
+ MessageDispatch md = (MessageDispatch) delivery.getContext();
+ }
+ }
+
+ }
+
+ private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
+
+ void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
+
+ ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
+ ConsumerContext consumerContext = new ConsumerContext(id, sender);
+
+ subscriptionsByConsumerId.put(id, consumerContext);
+
+ ActiveMQDestination destination = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+
+ sender.setContext(consumerContext);
+ ConsumerInfo consumerInfo = new ConsumerInfo(id);
+ consumerInfo.setDestination(destination);
+ consumerInfo.setPrefetchSize(100);
+ consumerInfo.setDispatchAsync(true);
+
+ sendToActiveMQ(consumerInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ sender.open();
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ sender.close();
+ }
+ pumpOut();
+ }
+ });
+
+ }
+
+//
+// QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
+// ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
+// if (destination == null) {
+// throw new AmqpProtocolException("Invalid Destination.");
+// }
+//
+// ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+// ConsumerInfo consumerInfo = new ConsumerInfo(id);
+// consumerInfo.setDestination(destination);
+// consumerInfo.setPrefetchSize(1000);
+// consumerInfo.setDispatchAsync(true);
+// if (!connect.cleanSession() && (connect.clientId() != null)) {
+// //by default subscribers are persistent
+// consumerInfo.setSubscriptionName(connect.clientId().toString());
+// }
+//
+// AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
+//
+//
+// amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
+//
+// sendToActiveMQ(consumerInfo, null);
+// return topic.qos();
+// }
+//
+// void onUnSubscribe(UNSUBSCRIBE command) {
+// UTF8Buffer[] topics = command.topics();
+// if (topics != null) {
+// for (int i = 0; i < topics.length; i++) {
+// onUnSubscribe(topics[i]);
+// }
+// }
+// UNSUBACK ack = new UNSUBACK();
+// ack.messageId(command.messageId());
+// pumpOut(ack.encode());
+//
+// }
+//
+// void onUnSubscribe(UTF8Buffer topicName) {
+// AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName);
+// if (subs != null) {
+// ConsumerInfo info = subs.getConsumerInfo();
+// if (info != null) {
+// subscriptionsByConsumerId.remove(info.getConsumerId());
+// }
+// RemoveInfo removeInfo = info.createRemoveCommand();
+// sendToActiveMQ(removeInfo, null);
+// }
+// }
+//
+//
+// /**
+// * Dispatch a ActiveMQ command
+// */
+//
+//
+//
+// void onAMQPPublish(PUBLISH command) throws IOException, JMSException {
+// checkConnected();
+// }
+//
+// void onAMQPPubAck(PUBACK command) {
+// short messageId = command.messageId();
+// MessageAck ack;
+// synchronized (consumerAcks) {
+// ack = consumerAcks.remove(messageId);
+// }
+// if (ack != null) {
+// amqpTransport.sendToActiveMQ(ack);
+// }
+// }
+//
+// void onAMQPPubRec(PUBREC commnand) {
+// //from a subscriber - send a PUBREL in response
+// PUBREL pubrel = new PUBREL();
+// pubrel.messageId(commnand.messageId());
+// pumpOut(pubrel.encode());
+// }
+//
+// void onAMQPPubRel(PUBREL command) {
+// PUBREC ack;
+// synchronized (publisherRecs) {
+// ack = publisherRecs.remove(command.messageId());
+// }
+// if (ack == null) {
+// LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+// }
+// PUBCOMP pubcomp = new PUBCOMP();
+// pubcomp.messageId(command.messageId());
+// pumpOut(pubcomp.encode());
+// }
+//
+// void onAMQPPubComp(PUBCOMP command) {
+// short messageId = command.messageId();
+// MessageAck ack;
+// synchronized (consumerAcks) {
+// ack = consumerAcks.remove(messageId);
+// }
+// if (ack != null) {
+// amqpTransport.sendToActiveMQ(ack);
+// }
+// }
+//
+//
+//
+//
+// public AmqpTransport amqpTransport {
+// return amqpTransport;
+// }
+//
+//
+//
+// void configureInactivityMonitor(short heartBeat) {
+// try {
+//
+// int heartBeatMS = heartBeat * 1000;
+// AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
+// monitor.setProtocolConverter(this);
+// monitor.setReadCheckTime(heartBeatMS);
+// monitor.setInitialDelayTime(heartBeatMS);
+// monitor.startMonitorThread();
+//
+// } catch (Exception ex) {
+// LOG.warn("Failed to start AMQP InactivityMonitor ", ex);
+// }
+//
+// LOG.debug(getClientId() + " AMQP Connection using heart beat of " + heartBeat + " secs");
+// }
+//
+//
+//
+// void checkConnected() throws AmqpProtocolException {
+// if (!connected.get()) {
+// throw new AmqpProtocolException("Not connected.");
+// }
+// }
+//
+// private String getClientId() {
+// if (clientId == null) {
+// if (connect != null && connect.clientId() != null) {
+// clientId = connect.clientId().toString();
+// }
+// } else {
+// clientId = "";
+// }
+// return clientId;
+// }
+//
+// private void stopTransport() {
+// try {
+// amqpTransport.stop();
+// } catch (Throwable e) {
+// LOG.debug("Failed to stop AMQP transport ", e);
+// }
+// }
+//
+// ResponseHandler createResponseHandler(final PUBLISH command) {
+//
+// if (command != null) {
+// switch (command.qos()) {
+// case AT_LEAST_ONCE:
+// return new ResponseHandler() {
+// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+// if (response.isException()) {
+// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
+// } else {
+// PUBACK ack = new PUBACK();
+// ack.messageId(command.messageId());
+// converter.amqpTransport.sendToAmqp(ack.encode());
+// }
+// }
+// };
+// case EXACTLY_ONCE:
+// return new ResponseHandler() {
+// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+// if (response.isException()) {
+// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
+// } else {
+// PUBREC ack = new PUBREC();
+// ack.messageId(command.messageId());
+// synchronized (publisherRecs) {
+// publisherRecs.put(command.messageId(), ack);
+// }
+// converter.amqpTransport.sendToAmqp(ack.encode());
+// }
+// }
+// };
+// case AT_MOST_ONCE:
+// break;
+// }
+// }
+// return null;
+// }
+//
+// private String convertAMQPToActiveMQ(String name) {
+// String result = name.replace('#', '>');
+// result = result.replace('+', '*');
+// result = result.replace('/', '.');
+// return result;
+// }
+
+ ////////////////////////////////////////////////////////////////////////////
+ //
+ // Implementation methods
+ //
+ ////////////////////////////////////////////////////////////////////////////
+
+ private final Object commnadIdMutex = new Object();
+ private int lastCommandId;
+
+ int generateCommandId() {
+ synchronized (commnadIdMutex) {
+ return lastCommandId++;
+ }
+ }
+
+ private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
+
+ void sendToActiveMQ(Command command, ResponseHandler handler) {
+ command.setCommandId(generateCommandId());
+ if (handler != null) {
+ command.setResponseRequired(true);
+ resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+ }
+ amqpTransport.sendToActiveMQ(command);
+ }
+
+ void handleException(Throwable exception) {
+ exception.printStackTrace();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception detail", exception);
+ }
+ try {
+ amqpTransport.stop();
+ } catch (Throwable e) {
+ LOG.error("Failed to stop AMQP Transport ", e);
+ }
+ }
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+
+
+public class AmqpProtocolException extends IOException {
+
+ private static final long serialVersionUID = -2869735532997332242L;
+
+ private final boolean fatal;
+
+ public AmqpProtocolException() {
+ this(null);
+ }
+
+ public AmqpProtocolException(String s) {
+ this(s, false);
+ }
+
+ public AmqpProtocolException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.*;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+
+/**
+ * Keeps track of the AMQP client subscription so that acking is correctly done.
+ */
+class AmqpSubscription {
+// private final AmqpProtocolConverter protocolConverter;
+//
+// private final ConsumerInfo consumerInfo;
+// private ActiveMQDestination destination;
+// private final QoS qos;
+//
+// public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) {
+// this.protocolConverter = protocolConverter;
+// this.consumerInfo = consumerInfo;
+// this.qos = qos;
+// }
+//
+// MessageAck createMessageAck(MessageDispatch md) {
+// return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+// }
+//
+// PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
+// PUBLISH publish = protocolConverter.convertMessage(message);
+// if (publish.qos().ordinal() > this.qos.ordinal()) {
+// publish.qos(this.qos);
+// }
+// return publish;
+// }
+//
+// public boolean expectAck() {
+// return qos != QoS.AT_MOST_ONCE;
+// }
+//
+// public void setDestination(ActiveMQDestination destination) {
+// this.destination = destination;
+// }
+//
+// public ActiveMQDestination getDestination() {
+// return destination;
+// }
+//
+// public ConsumerInfo getConsumerInfo() {
+// return consumerInfo;
+// }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.fusesource.hawtbuf.Buffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSupport {
+
+ static public Buffer toBuffer(ByteBuffer data) {
+ if( data == null ) {
+ return null;
+ }
+ Buffer rc;
+ if( data.isDirect() ) {
+ rc = new Buffer(data.remaining());
+ data.get(rc.data);
+ } else {
+ rc = new Buffer(data);
+ data.position(data.position()+data.remaining());
+ }
+ return rc;
+ }
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.Command;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+/**
+ * Basic interface that mediates between protocol converter and transport
+ */
+public interface AmqpTransport {
+
+ public void sendToActiveMQ(Command command);
+
+ public void sendToAmqp(Buffer command) throws IOException;
+
+ public X509Certificate[] getPeerCertificates();
+
+ public void onException(IOException error);
+
+// public AmqpInactivityMonitor getInactivityMonitor();
+
+ public AmqpWireFormat getWireFormat();
+
+ public void stop() throws Exception;
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> transport factory
+ */
+public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
+
+ private BrokerContext brokerContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new AmqpTransportFilter(transport, format, brokerContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ transport = super.serverConfigure(transport, format, options);
+
+ MutexTransport mutex = transport.narrow(MutexTransport.class);
+ if (mutex != null) {
+ mutex.setSyncOnCommand(true);
+ }
+
+ return transport;
+ }
+
+// @Override
+// protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//
+// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+// filter.setInactivityMonitor(monitor);
+//
+// return monitor;
+// }
+
+ @Override
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+/**
+ * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert AMQP commands to
+ * ActiveMQ commands. All of the conversion work is done by delegating to the
+ * AMQPProtocolConverter
+ */
+public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
+ private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO");
+ private final AmqpProtocolConverter protocolConverter;
+// private AmqpInactivityMonitor monitor;
+ private AmqpWireFormat wireFormat;
+
+ private boolean trace;
+
+ public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
+ super(next);
+ this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
+
+ if (wireFormat instanceof AmqpWireFormat) {
+ this.wireFormat = (AmqpWireFormat) wireFormat;
+ }
+ }
+
+ public void oneway(Object o) throws IOException {
+ try {
+ final Command command = (Command) o;
+ protocolConverter.onActiveMQCommand(command);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public void onCommand(Object command) {
+ try {
+ if (trace) {
+ TRACE.trace("Received: \n" + command);
+ }
+
+ protocolConverter.onAMQPData((Buffer) command);
+ } catch (IOException e) {
+ handleException(e);
+ } catch (JMSException e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ public void sendToActiveMQ(Command command) {
+ TransportListener l = transportListener;
+ if (l != null) {
+ l.onCommand(command);
+ }
+ }
+
+ public void sendToAmqp(Buffer command) throws IOException {
+ if (trace) {
+ TRACE.trace("Sending: \n" + command);
+ }
+ Transport n = next;
+ if (n != null) {
+ n.oneway(command);
+ }
+ }
+
+ public X509Certificate[] getPeerCertificates() {
+ if (next instanceof SslTransport) {
+ X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
+ if (trace && peerCerts != null) {
+ LOG.debug("Peer Identity has been verified\n");
+ }
+ return peerCerts;
+ }
+ return null;
+ }
+
+ public boolean isTrace() {
+ return trace;
+ }
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
+
+// @Override
+// public AmqpInactivityMonitor getInactivityMonitor() {
+// return monitor;
+// }
+//
+// public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
+// this.monitor = monitor;
+// }
+
+ @Override
+ public AmqpWireFormat getWireFormat() {
+ return this.wireFormat;
+ }
+
+ public void handleException(IOException e) {
+ super.onException(e);
+ }
+
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.*;
+
+/**
+ */
+public class AmqpWireFormat implements WireFormat {
+
+
+ private int version = 1;
+ private long maxFrameLength = 1024*1024*100;
+
+ public ByteSequence marshal(Object command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteSequence();
+ }
+
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(packet);
+ DataInputStream dis = new DataInputStream(stream);
+ return unmarshal(dis);
+ }
+
+ public void marshal(Object command, DataOutput dataOut) throws IOException {
+ Buffer frame = (Buffer) command;
+ frame.writeTo(dataOut);
+ }
+
+ boolean magicRead = false;
+ public Object unmarshal(DataInput dataIn) throws IOException {
+ if( !magicRead ) {
+ Buffer magic = new Buffer(8);
+ magic.readFrom(dataIn);
+ magicRead = true;
+ return magic;
+ } else {
+ int size = dataIn.readInt();
+ if( size > maxFrameLength ) {
+ throw new AmqpProtocolException("Frame size exceeded max frame length.");
+ }
+ Buffer frame = new Buffer(size);
+ frame.bigEndianEditor().writeInt(size);
+ frame.readFrom(dataIn);
+ frame.clear();
+ return frame;
+ }
+ }
+
+ /**
+ */
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ /**
+ * @return the version of the wire format
+ */
+ public int getVersion() {
+ return this.version;
+ }
+
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class AmqpWireFormatFactory implements WireFormatFactory {
+ public WireFormat createWireFormat() {
+ return new AmqpWireFormat();
+ }
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class InboundTransformer {
+
+ String prefixVendor = "JMS_AMQP_";
+ int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+ long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+}
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java Wed Oct 3 14:15:01 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+}