You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/03 16:15:03 UTC

svn commit: r1393500 [1/2] - in /activemq/trunk/activemq-amqp: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/transport/ src/main/java/org/apache/active...

Author: chirino
Date: Wed Oct  3 14:15:01 2012
New Revision: 1393500

URL: http://svn.apache.org/viewvc?rev=1393500&view=rev
Log:
Initial rough cut of AMQP protocol support using the QPID proton project.

Added:
    activemq/trunk/activemq-amqp/
    activemq/trunk/activemq-amqp/pom.xml
    activemq/trunk/activemq-amqp/src/
    activemq/trunk/activemq-amqp/src/main/
    activemq/trunk/activemq-amqp/src/main/java/
    activemq/trunk/activemq-amqp/src/main/java/org/
    activemq/trunk/activemq-amqp/src/main/java/org/apache/
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/package.html
    activemq/trunk/activemq-amqp/src/main/resources/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+nio+ssl
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
    activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/amqp
    activemq/trunk/activemq-amqp/src/test/
    activemq/trunk/activemq-amqp/src/test/java/
    activemq/trunk/activemq-amqp/src/test/java/org/
    activemq/trunk/activemq-amqp/src/test/java/org/apache/
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
    activemq/trunk/activemq-amqp/src/test/resources/
    activemq/trunk/activemq-amqp/src/test/resources/log4j.properties   (with props)

Added: activemq/trunk/activemq-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/pom.xml?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/pom.xml (added)
+++ activemq/trunk/activemq-amqp/pom.xml Wed Oct  3 14:15:01 2012
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.8-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>activemq-amqp</artifactId>
+  <packaging>jar</packaging>
+
+  <name>ActiveMQ :: AMQP</name>
+  <description>ActiveMQ implementaiton of AMQP messaging protocol</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-proton</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
+
+    <!-- Testing Dependencies -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-console</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+  
+  <profiles>
+    <profile>
+      <!-- profile which is activated is the swiftmq-client-home prop is defined.
+           Tt tests the amqp broker impl using the swiftmq client libs -->
+      <id>swiftmq-client</id>
+      <activation>
+        <property><name>swiftmq-client-home</name></property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>com.swiftmq</groupId>
+          <artifactId>swiftmq</artifactId>
+          <version>9.2.0</version>
+          <scope>system</scope>
+          <systemPath>${swiftmq-client-home}/jars/swiftmq.jar</systemPath>
+        </dependency>
+        <dependency>
+          <groupId>com.swiftmq</groupId>
+          <artifactId>swiftmq-amqp</artifactId>
+          <version>9.2.0</version>
+          <scope>system</scope>
+          <systemPath>${swiftmq-client-home}/jars/amqp.jar</systemPath>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.7</version>
+            <executions>
+              <execution>
+                <id>add-test-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${basedir}/src/test-swiftmq/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPNativeInboundTransformer.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AMQPNativeInboundTransformer extends InboundTransformer {
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
+ */
+public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
+
+    private BrokerContext brokerContext = null;
+
+    protected String getDefaultWireFormatType() {
+        return "amqp";
+    }
+
+    @SuppressWarnings("rawtypes")
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        MutexTransport mutex = transport.narrow(MutexTransport.class);
+        if (mutex != null) {
+            mutex.setSyncOnCommand(true);
+        }
+
+        return transport;
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerContext = brokerService.getBrokerContext();
+    }
+
+//    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+//        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//
+//        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+//        filter.setInactivityMonitor(monitor);
+//
+//        return monitor;
+//    }
+
+
+    @Override
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return false;
+    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+public class AmqpNioSslTransport extends NIOSSLTransport {
+
+    public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    @Override
+    protected void initializeStreams() throws IOException {
+        super.initializeStreams();
+        if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
+            serviceRead();
+        }
+    }
+
+    @Override
+    protected void processCommand(ByteBuffer plain) throws Exception {
+        doConsume(AmqpSupport.toBuffer(plain));
+    }
+
+}
\ No newline at end of file

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
+
+    SSLContext context;
+
+    @Override
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory) {
+            protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+                AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket);
+                if (context != null) {
+                    transport.setSslContext(context);
+                }
+                return transport;
+            }
+        };
+    }
+
+    @Override
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new AmqpNioSslTransport(wf, socketFactory, location, localLocation);
+    }
+
+    @Override
+    public TransportServer doBind(URI location) throws IOException {
+        if (SslContext.getCurrentSslContext() != null) {
+            try {
+                context = SslContext.getCurrentSslContext().getSSLContext();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+        return super.doBind(location);
+    }
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+
+import javax.net.SocketFactory;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
+ */
+public class AmqpNioTransport extends TcpTransport {
+
+    private SocketChannel channel;
+    private SelectorSelection selection;
+
+    private ByteBuffer inputBuffer;
+
+    public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    protected void initializeStreams() throws IOException {
+        channel = socket.getChannel();
+        channel.configureBlocking(false);
+        // listen for events telling us when the socket is readable.
+        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+            public void onSelect(SelectorSelection selection) {
+                if (!isStopped()) {
+                    serviceRead();
+                }
+            }
+
+            public void onError(SelectorSelection selection, Throwable error) {
+                if (error instanceof IOException) {
+                    onException((IOException) error);
+                } else {
+                    onException(IOExceptionSupport.create(error));
+                }
+            }
+        });
+
+        inputBuffer = ByteBuffer.allocate(8 * 1024);
+        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
+        this.dataOut = new DataOutputStream(outPutStream);
+        this.buffOut = outPutStream;
+    }
+
+    private void serviceRead() {
+        try {
+
+            while (isStarted()) {
+                // read channel
+                int readSize = channel.read(inputBuffer);
+                // channel is closed, cleanup
+                if (readSize == -1) {
+                    onException(new EOFException());
+                    selection.close();
+                    break;
+                }
+                // nothing more to read, break
+                if (readSize == 0) {
+                    break;
+                }
+
+                inputBuffer.flip();
+                doConsume(AmqpSupport.toBuffer(inputBuffer));
+                // clear the buffer
+                inputBuffer.clear();
+
+            }
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    protected void doStart() throws Exception {
+        connect();
+        selection.setInterestOps(SelectionKey.OP_READ);
+        selection.enable();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        try {
+            if (selection != null) {
+                selection.close();
+            }
+        } finally {
+            super.doStop(stopper);
+        }
+    }
+}
\ No newline at end of file

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> over NIO transport factory
+ */
+public class AmqpNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
+
+    private BrokerContext brokerContext = null;
+
+    protected String getDefaultWireFormatType() {
+        return "amqp";
+    }
+
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory) {
+            protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+                return new AmqpNioTransport(format, socket);
+            }
+        };
+    }
+
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new AmqpNioTransport(wf, socketFactory, location, localLocation);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        MutexTransport mutex = transport.narrow(MutexTransport.class);
+        if (mutex != null) {
+            mutex.setSyncOnCommand(true);
+        }
+
+        return transport;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerContext = brokerService.getBrokerContext();
+    }
+
+//    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+//        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+//        filter.setInactivityMonitor(monitor);
+//        return monitor;
+//    }
+
+    @Override
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return false;
+    }
+}
+

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,750 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.command.*;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.util.*;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.Inflater;
+
+class AmqpProtocolConverter {
+
+    public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
+    public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
+    public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
+    public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
+
+    private final AmqpTransport amqpTransport;
+
+    public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
+        this.amqpTransport = amqpTransport;
+    }
+
+//
+//    private static final Buffer PING_RESP_FRAME = new PINGRESP().encode();
+//
+//
+//    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+//    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+//
+//    private final ConcurrentHashMap<ConsumerId, AmqpSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSubscription>();
+//    private final ConcurrentHashMap<UTF8Buffer, AmqpSubscription> amqpSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, AmqpSubscription>();
+//    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
+//    private final Map<Destination, UTF8Buffer> amqpTopicMap = new LRUCache<Destination, UTF8Buffer>();
+//    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
+//    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
+//
+//    private final AtomicBoolean connected = new AtomicBoolean(false);
+//    private CONNECT connect;
+//    private String clientId;
+//    private final String QOS_PROPERTY_NAME = "QoSPropertyName";
+
+
+    TransportImpl protonTransport = new TransportImpl();
+    ConnectionImpl protonConnection = new ConnectionImpl();
+
+    {
+        this.protonTransport.bind(this.protonConnection);
+    }
+
+    void pumpOut() {
+        try {
+            int size = 1024 * 64;
+            byte data[] = new byte[size];
+            boolean done = false;
+            while (!done) {
+                int count = protonTransport.output(data, 0, size);
+                if (count > 0) {
+                    final Buffer buffer = new Buffer(data, 0, count);
+                    System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
+                    amqpTransport.sendToAmqp(buffer);
+                } else {
+                    done = true;
+                }
+            }
+//            System.out.println("write done");
+        } catch (IOException e) {
+            amqpTransport.onException(e);
+        }
+    }
+
+    static class AmqpSessionContext {
+        private final SessionId sessionId;
+        long nextProducerId = 0;
+        long nextConsumerId = 0;
+
+        public AmqpSessionContext(ConnectionId connectionId, long id) {
+            sessionId = new SessionId(connectionId, -1);
+
+        }
+    }
+
+    /**
+     * Convert a AMQP command
+     */
+    public void onAMQPData(Buffer frame) throws IOException, JMSException {
+
+
+        try {
+            System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+            protonTransport.input(frame.data, frame.offset, frame.length);
+        } catch (Throwable e) {
+            handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+        }
+
+        try {
+
+            // Handle the amqp open..
+            if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
+                onConnectionOpen();
+            }
+
+            // Lets map amqp sessions to openwire sessions..
+            Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+            while (session != null) {
+
+                onSessionOpen(session);
+                session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+            }
+
+
+            Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+            while (link != null) {
+                onLinkOpen(link);
+                link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+            }
+
+            Delivery delivery = protonConnection.getWorkHead();
+            while (delivery != null) {
+                AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+                if (listener != null) {
+                    listener.onDelivery(delivery);
+                }
+                delivery = delivery.getWorkNext();
+            }
+
+            link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+            while (link != null) {
+                if (link instanceof Receiver) {
+//                    listener.onReceiverClose((Receiver) link);
+                } else {
+//                    listener.onSenderClose((Sender) link);
+                }
+                link.close();
+                link = link.next(ACTIVE_STATE, CLOSED_STATE);
+            }
+
+            session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+            while (session != null) {
+                //TODO - close links?
+//                listener.onSessionClose(session);
+                session.close();
+                session = session.next(ACTIVE_STATE, CLOSED_STATE);
+            }
+            if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
+//                listener.onConnectionClose(protonConnection);
+                protonConnection.close();
+            }
+
+        } catch (Throwable e) {
+            handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+        }
+
+        pumpOut();
+    }
+
+    public void onActiveMQCommand(Command command) throws Exception {
+        if (command.isResponse()) {
+            Response response = (Response) command;
+            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+            if (rh != null) {
+                rh.onResponse(this, response);
+            } else {
+                // Pass down any unexpected errors. Should this close the connection?
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    handleException(exception);
+                }
+            }
+        } else if (command.isMessageDispatch()) {
+            MessageDispatch md = (MessageDispatch) command;
+            ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
+            if (consumerContext != null) {
+                consumerContext.onMessageDispatch(md);
+            }
+        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+            // Pass down any unexpected async errors. Should this close the connection?
+            Throwable exception = ((ConnectionError) command).getException();
+            handleException(exception);
+        } else if (command.isBrokerInfo()) {
+            //ignore
+        } else {
+            LOG.debug("Do not know how to process ActiveMQ Command " + command);
+        }
+    }
+
+    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
+    private ConnectionInfo connectionInfo = new ConnectionInfo();
+    private long nextSessionId = 0;
+
+    static abstract class AmqpDeliveryListener {
+        abstract public void onDelivery(Delivery delivery) throws Exception;
+    }
+
+    private void onConnectionOpen() throws AmqpProtocolException {
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setConnectionId(connectionId);
+//        configureInactivityMonitor(connect.keepAlive());
+
+        String clientId = protonConnection.getRemoteContainer();
+        if (clientId != null && !clientId.isEmpty()) {
+            connectionInfo.setClientId(clientId);
+        } else {
+            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
+        }
+
+
+//        String userName = "";
+//        if (connect.userName() != null) {
+//            userName = connect.userName().toString();
+//        }
+//        String passswd = "";
+//        if (connect.password() != null) {
+//            passswd = connect.password().toString();
+//        }
+//        connectionInfo.setUserName(userName);
+//        connectionInfo.setPassword(passswd);
+
+        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
+
+        sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+
+                protonConnection.open();
+                pumpOut();
+
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+// TODO: figure out how to close /w an error.
+//                    protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
+                    protonConnection.close();
+                    pumpOut();
+                    amqpTransport.onException(IOExceptionSupport.create(exception));
+                    return;
+                }
+
+            }
+        });
+    }
+
+    private void onSessionOpen(Session session) {
+        AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
+        session.setContext(sessionContext);
+        sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
+        session.open();
+    }
+
+    private void onLinkOpen(Link link) {
+        link.setLocalSourceAddress(link.getRemoteSourceAddress());
+        link.setLocalTargetAddress(link.getRemoteTargetAddress());
+
+        AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
+        if (link instanceof Receiver) {
+            onReceiverOpen((Receiver) link, sessionContext);
+        } else {
+            onSenderOpen((Sender) link, sessionContext);
+        }
+    }
+
+    class ProducerContext extends AmqpDeliveryListener {
+        private final ProducerId producerId;
+        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+        private final ActiveMQDestination destination;
+
+        public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+            this.producerId = producerId;
+            this.destination = destination;
+        }
+
+        @Override
+        public void onDelivery(Delivery delivery) throws JMSException {
+//            delivery.
+            ActiveMQMessage message = convertMessage((DeliveryImpl) delivery);
+            message.setProducerId(producerId);
+            message.onSend();
+//            sendToActiveMQ(message, createResponseHandler(command));
+            sendToActiveMQ(message, null);
+        }
+
+        ActiveMQMessage convertMessage(DeliveryImpl delivery) throws JMSException {
+            ActiveMQBytesMessage msg = nextMessage(delivery);
+            final Receiver receiver = (Receiver) delivery.getLink();
+            byte buff[] = new byte[1024 * 4];
+            int count = 0;
+            while ((count = receiver.recv(buff, 0, buff.length)) >= 0) {
+                msg.writeBytes(buff, 0, count);
+            }
+            return msg;
+        }
+
+        ActiveMQBytesMessage current;
+
+        private ActiveMQBytesMessage nextMessage(DeliveryImpl delivery) throws JMSException {
+            if (current == null) {
+                current = new ActiveMQBytesMessage();
+                current.setJMSDestination(destination);
+                current.setProducerId(producerId);
+                current.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
+                current.setTimestamp(System.currentTimeMillis());
+                current.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+//            msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
+//            msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
+                System.out.println(delivery.getLocalState() + "/" + delivery.getRemoteState());
+            }
+            return current;
+        }
+
+    }
+
+
+    void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
+        // Client is producing to this receiver object
+
+        ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(receiver.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+        ProducerContext producerContext = new ProducerContext(producerId, destination);
+
+        receiver.setContext(producerContext);
+        receiver.flow(1024 * 64);
+        ProducerInfo producerInfo = new ProducerInfo(producerId);
+        producerInfo.setDestination(destination);
+        sendToActiveMQ(producerInfo, new ResponseHandler() {
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                receiver.open();
+                if (response.isException()) {
+                    // If the connection attempt fails we close the socket.
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    receiver.close();
+                }
+                pumpOut();
+            }
+        });
+
+    }
+
+
+    class ConsumerContext extends AmqpDeliveryListener {
+        private final ConsumerId consumerId;
+        private final Sender sender;
+
+        long nextTagId = 0;
+        HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+        byte[] nextTag() {
+            byte[] rc;
+            if (tagCache != null && !tagCache.isEmpty()) {
+                final Iterator<byte[]> iterator = tagCache.iterator();
+                rc = iterator.next();
+                iterator.remove();
+            } else {
+                try {
+                    rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+                } catch (UnsupportedEncodingException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            return rc;
+        }
+
+        public ConsumerContext(ConsumerId consumerId, Sender sender) {
+            this.consumerId = consumerId;
+            this.sender = sender;
+        }
+
+        // called when the connection receives a JMS message from ActiveMQ
+        public void onMessageDispatch(MessageDispatch md) throws Exception {
+            final byte[] tag = nextTag();
+            final Delivery delivery = sender.delivery(tag, 0, tag.length);
+            delivery.setContext(md);
+
+            // Covert to an AMQP messages.
+            org.apache.qpid.proton.message.Message msg = convertMessage(md.getMessage());
+            byte buffer[] = new byte[1024*4];
+            int c=0;
+
+            // And send the AMQP message over the link.
+            while( (c=msg.encode(buffer, 0 , 0)) >= 0 ) {
+                sender.send(buffer, 0, c);
+            }
+            sender.advance();
+
+        }
+
+        public org.apache.qpid.proton.message.Message convertMessage(Message message) throws Exception {
+//            result.setContentEncoding();
+//            QoS qoS;
+//            if (message.propertyExists(QOS_PROPERTY_NAME)) {
+//                int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
+//                qoS = QoS.values()[ordinal];
+//
+//            } else {
+//                qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
+//            }
+//            result.qos(qoS);
+
+            Buffer content = null;
+            if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+                ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+                msg.setReadOnlyBody(true);
+                String messageText = msg.getText();
+                content = new Buffer(messageText.getBytes("UTF-8"));
+            } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+                ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
+                msg.setReadOnlyBody(true);
+                byte[] data = new byte[(int) msg.getBodyLength()];
+                msg.readBytes(data);
+                content = new Buffer(data);
+            } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+                ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+                msg.setReadOnlyBody(true);
+                Map map = msg.getContentMap();
+                content = new Buffer(map.toString().getBytes("UTF-8"));
+            } else {
+                ByteSequence byteSequence = message.getContent();
+                if (byteSequence != null && byteSequence.getLength() > 0) {
+                    if (message.isCompressed()) {
+                        Inflater inflater = new Inflater();
+                        inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
+                        byte[] data = new byte[4096];
+                        int read;
+                        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                        while ((read = inflater.inflate(data)) != 0) {
+                            bytesOut.write(data, 0, read);
+                        }
+                        byteSequence = bytesOut.toByteSequence();
+                    }
+                    content = new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length);
+                } else {
+                    content = new Buffer(0);
+                }
+            }
+
+            org.apache.qpid.proton.message.Message result = new org.apache.qpid.proton.message.Message();
+            return result;
+        }
+
+
+        @Override
+        public void onDelivery(Delivery delivery) throws JMSException {
+            if( delivery.remotelySettled() ) {
+                MessageDispatch md = (MessageDispatch) delivery.getContext();
+            }
+        }
+
+    }
+
+    private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
+
+    void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
+
+        ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
+        ConsumerContext consumerContext = new ConsumerContext(id, sender);
+
+        subscriptionsByConsumerId.put(id, consumerContext);
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+
+        sender.setContext(consumerContext);
+        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setDestination(destination);
+        consumerInfo.setPrefetchSize(100);
+        consumerInfo.setDispatchAsync(true);
+
+        sendToActiveMQ(consumerInfo, new ResponseHandler() {
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                sender.open();
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    sender.close();
+                }
+                pumpOut();
+            }
+        });
+
+    }
+
+//
+//    QoS onSubscribe(SUBSCRIBE command, Topic topic) throws AmqpProtocolException {
+//        ActiveMQDestination destination = new ActiveMQTopic(convertAMQPToActiveMQ(topic.name().toString()));
+//        if (destination == null) {
+//            throw new AmqpProtocolException("Invalid Destination.");
+//        }
+//
+//        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+//        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+//        consumerInfo.setDestination(destination);
+//        consumerInfo.setPrefetchSize(1000);
+//        consumerInfo.setDispatchAsync(true);
+//        if (!connect.cleanSession() && (connect.clientId() != null)) {
+//            //by default subscribers are persistent
+//            consumerInfo.setSubscriptionName(connect.clientId().toString());
+//        }
+//
+//        AmqpSubscription amqpSubscription = new AmqpSubscription(this, topic.qos(), consumerInfo);
+//
+//
+//        amqpSubscriptionByTopic.put(topic.name(), amqpSubscription);
+//
+//        sendToActiveMQ(consumerInfo, null);
+//        return topic.qos();
+//    }
+//
+//    void onUnSubscribe(UNSUBSCRIBE command) {
+//        UTF8Buffer[] topics = command.topics();
+//        if (topics != null) {
+//            for (int i = 0; i < topics.length; i++) {
+//                onUnSubscribe(topics[i]);
+//            }
+//        }
+//        UNSUBACK ack = new UNSUBACK();
+//        ack.messageId(command.messageId());
+//        pumpOut(ack.encode());
+//
+//    }
+//
+//    void onUnSubscribe(UTF8Buffer topicName) {
+//        AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName);
+//        if (subs != null) {
+//            ConsumerInfo info = subs.getConsumerInfo();
+//            if (info != null) {
+//                subscriptionsByConsumerId.remove(info.getConsumerId());
+//            }
+//            RemoveInfo removeInfo = info.createRemoveCommand();
+//            sendToActiveMQ(removeInfo, null);
+//        }
+//    }
+//
+//
+//    /**
+//     * Dispatch a ActiveMQ command
+//     */
+//
+//
+//
+//    void onAMQPPublish(PUBLISH command) throws IOException, JMSException {
+//        checkConnected();
+//    }
+//
+//    void onAMQPPubAck(PUBACK command) {
+//        short messageId = command.messageId();
+//        MessageAck ack;
+//        synchronized (consumerAcks) {
+//            ack = consumerAcks.remove(messageId);
+//        }
+//        if (ack != null) {
+//            amqpTransport.sendToActiveMQ(ack);
+//        }
+//    }
+//
+//    void onAMQPPubRec(PUBREC commnand) {
+//        //from a subscriber - send a PUBREL in response
+//        PUBREL pubrel = new PUBREL();
+//        pubrel.messageId(commnand.messageId());
+//        pumpOut(pubrel.encode());
+//    }
+//
+//    void onAMQPPubRel(PUBREL command) {
+//        PUBREC ack;
+//        synchronized (publisherRecs) {
+//            ack = publisherRecs.remove(command.messageId());
+//        }
+//        if (ack == null) {
+//            LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+//        }
+//        PUBCOMP pubcomp = new PUBCOMP();
+//        pubcomp.messageId(command.messageId());
+//        pumpOut(pubcomp.encode());
+//    }
+//
+//    void onAMQPPubComp(PUBCOMP command) {
+//        short messageId = command.messageId();
+//        MessageAck ack;
+//        synchronized (consumerAcks) {
+//            ack = consumerAcks.remove(messageId);
+//        }
+//        if (ack != null) {
+//            amqpTransport.sendToActiveMQ(ack);
+//        }
+//    }
+//
+//
+//
+//
+//    public AmqpTransport amqpTransport {
+//        return amqpTransport;
+//    }
+//
+//
+//
+//    void configureInactivityMonitor(short heartBeat) {
+//        try {
+//
+//            int heartBeatMS = heartBeat * 1000;
+//            AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
+//            monitor.setProtocolConverter(this);
+//            monitor.setReadCheckTime(heartBeatMS);
+//            monitor.setInitialDelayTime(heartBeatMS);
+//            monitor.startMonitorThread();
+//
+//        } catch (Exception ex) {
+//            LOG.warn("Failed to start AMQP InactivityMonitor ", ex);
+//        }
+//
+//        LOG.debug(getClientId() + " AMQP Connection using heart beat of  " + heartBeat + " secs");
+//    }
+//
+//
+//
+//    void checkConnected() throws AmqpProtocolException {
+//        if (!connected.get()) {
+//            throw new AmqpProtocolException("Not connected.");
+//        }
+//    }
+//
+//    private String getClientId() {
+//        if (clientId == null) {
+//            if (connect != null && connect.clientId() != null) {
+//                clientId = connect.clientId().toString();
+//            }
+//        } else {
+//            clientId = "";
+//        }
+//        return clientId;
+//    }
+//
+//    private void stopTransport() {
+//        try {
+//            amqpTransport.stop();
+//        } catch (Throwable e) {
+//            LOG.debug("Failed to stop AMQP transport ", e);
+//        }
+//    }
+//
+//    ResponseHandler createResponseHandler(final PUBLISH command) {
+//
+//        if (command != null) {
+//            switch (command.qos()) {
+//                case AT_LEAST_ONCE:
+//                    return new ResponseHandler() {
+//                        public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+//                            if (response.isException()) {
+//                                LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
+//                            } else {
+//                                PUBACK ack = new PUBACK();
+//                                ack.messageId(command.messageId());
+//                                converter.amqpTransport.sendToAmqp(ack.encode());
+//                            }
+//                        }
+//                    };
+//                case EXACTLY_ONCE:
+//                    return new ResponseHandler() {
+//                        public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+//                            if (response.isException()) {
+//                                LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException());
+//                            } else {
+//                                PUBREC ack = new PUBREC();
+//                                ack.messageId(command.messageId());
+//                                synchronized (publisherRecs) {
+//                                    publisherRecs.put(command.messageId(), ack);
+//                                }
+//                                converter.amqpTransport.sendToAmqp(ack.encode());
+//                            }
+//                        }
+//                    };
+//                case AT_MOST_ONCE:
+//                    break;
+//            }
+//        }
+//        return null;
+//    }
+//
+//    private String convertAMQPToActiveMQ(String name) {
+//        String result = name.replace('#', '>');
+//        result = result.replace('+', '*');
+//        result = result.replace('/', '.');
+//        return result;
+//    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    //
+    // Implementation methods
+    //
+    ////////////////////////////////////////////////////////////////////////////
+
+    private final Object commnadIdMutex = new Object();
+    private int lastCommandId;
+
+    int generateCommandId() {
+        synchronized (commnadIdMutex) {
+            return lastCommandId++;
+        }
+    }
+
+    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
+
+    void sendToActiveMQ(Command command, ResponseHandler handler) {
+        command.setCommandId(generateCommandId());
+        if (handler != null) {
+            command.setResponseRequired(true);
+            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+        }
+        amqpTransport.sendToActiveMQ(command);
+    }
+
+    void handleException(Throwable exception) {
+        exception.printStackTrace();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Exception detail", exception);
+        }
+        try {
+            amqpTransport.stop();
+        } catch (Throwable e) {
+            LOG.error("Failed to stop AMQP Transport ", e);
+        }
+    }
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+
+
+public class AmqpProtocolException extends IOException {
+
+    private static final long serialVersionUID = -2869735532997332242L;
+
+    private final boolean fatal;
+
+    public AmqpProtocolException() {
+        this(null);
+    }
+
+    public AmqpProtocolException(String s) {
+        this(s, false);
+    }
+
+    public AmqpProtocolException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public AmqpProtocolException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSubscription.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.*;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+
+/**
+ * Keeps track of the AMQP client subscription so that acking is correctly done.
+ */
+class AmqpSubscription {
+//    private final AmqpProtocolConverter protocolConverter;
+//
+//    private final ConsumerInfo consumerInfo;
+//    private ActiveMQDestination destination;
+//    private final QoS qos;
+//
+//    public AmqpSubscription(AmqpProtocolConverter protocolConverter, QoS qos, ConsumerInfo consumerInfo) {
+//        this.protocolConverter = protocolConverter;
+//        this.consumerInfo = consumerInfo;
+//        this.qos = qos;
+//    }
+//
+//    MessageAck createMessageAck(MessageDispatch md) {
+//        return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+//    }
+//
+//    PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException {
+//        PUBLISH publish = protocolConverter.convertMessage(message);
+//        if (publish.qos().ordinal() > this.qos.ordinal()) {
+//            publish.qos(this.qos);
+//        }
+//        return publish;
+//    }
+//
+//    public boolean expectAck() {
+//        return qos != QoS.AT_MOST_ONCE;
+//    }
+//
+//    public void setDestination(ActiveMQDestination destination) {
+//        this.destination = destination;
+//    }
+//
+//    public ActiveMQDestination getDestination() {
+//        return destination;
+//    }
+//
+//    public ConsumerInfo getConsumerInfo() {
+//        return consumerInfo;
+//    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.fusesource.hawtbuf.Buffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSupport {
+
+    static public Buffer toBuffer(ByteBuffer data) {
+        if( data == null ) {
+            return null;
+        }
+        Buffer rc;
+        if( data.isDirect() ) {
+            rc = new Buffer(data.remaining());
+            data.get(rc.data);
+        } else {
+            rc = new Buffer(data);
+            data.position(data.position()+data.remaining());
+        }
+        return rc;
+    }
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.Command;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+/**
+ * Basic interface that mediates between protocol converter and transport
+ */
+public interface AmqpTransport {
+
+    public void sendToActiveMQ(Command command);
+
+    public void sendToAmqp(Buffer command) throws IOException;
+
+    public X509Certificate[] getPeerCertificates();
+
+    public void onException(IOException error);
+
+//    public AmqpInactivityMonitor getInactivityMonitor();
+
+    public AmqpWireFormat getWireFormat();
+
+    public void stop() throws Exception;
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> transport factory
+ */
+public class AmqpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
+
+    private BrokerContext brokerContext = null;
+
+    protected String getDefaultWireFormatType() {
+        return "amqp";
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        transport = new AmqpTransportFilter(transport, format, brokerContext);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerContext = brokerService.getBrokerContext();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        MutexTransport mutex = transport.narrow(MutexTransport.class);
+        if (mutex != null) {
+            mutex.setSyncOnCommand(true);
+        }
+
+        return transport;
+    }
+
+//    @Override
+//    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+//        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+//
+//        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+//        filter.setInactivityMonitor(monitor);
+//
+//        return monitor;
+//    }
+
+    @Override
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return false;
+    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+/**
+ * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
+ * configured with the StompWireFormat and is used to convert AMQP commands to
+ * ActiveMQ commands. All of the conversion work is done by delegating to the
+ * AMQPProtocolConverter
+ */
+public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
+    private static final Logger TRACE = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".AMQPIO");
+    private final AmqpProtocolConverter protocolConverter;
+//    private AmqpInactivityMonitor monitor;
+    private AmqpWireFormat wireFormat;
+
+    private boolean trace;
+
+    public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
+        super(next);
+        this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
+
+        if (wireFormat instanceof AmqpWireFormat) {
+            this.wireFormat = (AmqpWireFormat) wireFormat;
+        }
+    }
+
+    public void oneway(Object o) throws IOException {
+        try {
+            final Command command = (Command) o;
+            protocolConverter.onActiveMQCommand(command);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    public void onCommand(Object command) {
+        try {
+            if (trace) {
+                TRACE.trace("Received: \n" + command);
+            }
+
+            protocolConverter.onAMQPData((Buffer) command);
+        } catch (IOException e) {
+            handleException(e);
+        } catch (JMSException e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    public void sendToActiveMQ(Command command) {
+        TransportListener l = transportListener;
+        if (l != null) {
+            l.onCommand(command);
+        }
+    }
+
+    public void sendToAmqp(Buffer command) throws IOException {
+        if (trace) {
+            TRACE.trace("Sending: \n" + command);
+        }
+        Transport n = next;
+        if (n != null) {
+            n.oneway(command);
+        }
+    }
+
+    public X509Certificate[] getPeerCertificates() {
+        if (next instanceof SslTransport) {
+            X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
+            if (trace && peerCerts != null) {
+                LOG.debug("Peer Identity has been verified\n");
+            }
+            return peerCerts;
+        }
+        return null;
+    }
+
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
+
+//    @Override
+//    public AmqpInactivityMonitor getInactivityMonitor() {
+//        return monitor;
+//    }
+//
+//    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
+//        this.monitor = monitor;
+//    }
+
+    @Override
+    public AmqpWireFormat getWireFormat() {
+        return this.wireFormat;
+    }
+
+    public void handleException(IOException e) {
+        super.onException(e);
+    }
+
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.*;
+
+/**
+ */
+public class AmqpWireFormat implements WireFormat {
+
+
+    private int version = 1;
+    private long maxFrameLength = 1024*1024*100;
+
+    public ByteSequence marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return baos.toByteSequence();
+    }
+
+    public Object unmarshal(ByteSequence packet) throws IOException {
+        ByteArrayInputStream stream = new ByteArrayInputStream(packet);
+        DataInputStream dis = new DataInputStream(stream);
+        return unmarshal(dis);
+    }
+
+    public void marshal(Object command, DataOutput dataOut) throws IOException {
+        Buffer frame = (Buffer) command;
+        frame.writeTo(dataOut);
+    }
+
+    boolean magicRead = false;
+    public Object unmarshal(DataInput dataIn) throws IOException {
+        if( !magicRead ) {
+            Buffer magic = new Buffer(8);
+            magic.readFrom(dataIn);
+            magicRead = true;
+            return magic;
+        } else {
+            int size = dataIn.readInt();
+            if( size > maxFrameLength ) {
+                throw new AmqpProtocolException("Frame size exceeded max frame length.");
+            }
+            Buffer frame = new Buffer(size);
+            frame.bigEndianEditor().writeInt(size);
+            frame.readFrom(dataIn);
+            frame.clear();
+            return frame;
+        }
+    }
+
+    /**
+     */
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    /**
+     * @return the version of the wire format
+     */
+    public int getVersion() {
+        return this.version;
+    }
+
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class AmqpWireFormatFactory implements WireFormatFactory {
+    public WireFormat createWireFormat() {
+        return new AmqpWireFormat();
+    }
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/InboundTransformer.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class InboundTransformer {
+
+    String prefixVendor = "JMS_AMQP_";
+    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
+    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+
+}

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java?rev=1393500&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/JMSMappingInboundTransformer.java Wed Oct  3 14:15:01 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class JMSMappingInboundTransformer extends InboundTransformer {
+
+}