You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/26 16:39:47 UTC

svn commit: r1402532 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/ test/java/org/apache/activemq/transport/amqp/

Author: chirino
Date: Fri Oct 26 14:39:46 2012
New Revision: 1402532

URL: http://svn.apache.org/viewvc?rev=1402532&view=rev
Log:
PLAIN Sasl is now working.

Added:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java

Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java?rev=1402532&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java Fri Oct 26 14:39:46 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+    static final Buffer PREFIX = new Buffer(new byte[]{
+      'A', 'M', 'Q', 'P'
+    });
+
+    private Buffer buffer;
+
+    public AmqpHeader(){
+        this(new Buffer(new byte[]{
+          'A', 'M', 'Q', 'P', 0, 1, 0, 0
+        }));
+    }
+
+    public AmqpHeader(Buffer buffer){
+        setBuffer(buffer);
+    }
+
+    public int getProtocolId() {
+        return buffer.get(4) & 0xFF;
+    }
+    public void setProtocolId(int value) {
+        buffer.data[buffer.offset+4] = (byte) value;
+    }
+
+    public int getMajor() {
+        return buffer.get(5) & 0xFF;
+    }
+    public void setMajor(int value) {
+        buffer.data[buffer.offset+5] = (byte) value;
+    }
+
+    public int getMinor() {
+        return buffer.get(6) & 0xFF;
+    }
+    public void setMinor(int value) {
+        buffer.data[buffer.offset+6] = (byte) value;
+    }
+
+    public int getRevision() {
+        return buffer.get(7) & 0xFF;
+    }
+    public void setRevision(int value) {
+        buffer.data[buffer.offset+7] = (byte) value;
+    }
+
+    public Buffer getBuffer() {
+        return buffer;
+    }
+    public void setBuffer(Buffer value) {
+        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+        }
+        buffer = value.buffer();
+    }
+
+
+    @Override
+    public String toString() {
+        return buffer.toString();
+    }
+}

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Fri Oct 26 14:39:46 2012
@@ -22,12 +22,8 @@ import org.apache.activemq.transport.amq
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.qpid.proton.codec.Decoder;
-import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.engine.*;
-import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.engine.impl.*;
 import org.apache.qpid.proton.framing.TransportFrame;
 import org.apache.qpid.proton.type.Binary;
 import org.apache.qpid.proton.type.DescribedType;
@@ -45,7 +41,6 @@ import org.fusesource.hawtbuf.ByteArrayO
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
@@ -141,79 +136,128 @@ class AmqpProtocolConverter {
         }
     }
 
+    Sasl sasl;
+
     /**
      * Convert a AMQP command
      */
-    public void onAMQPData(Buffer frame) throws IOException, JMSException {
-
-
-        try {
-//            System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
-            protonTransport.input(frame.data, frame.offset, frame.length);
-        } catch (Throwable e) {
-            handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+    public void onAMQPData(Object command) throws Exception {
+        Buffer frame;
+        if( command.getClass() == AmqpHeader.class ) {
+            AmqpHeader header = (AmqpHeader)command;
+            switch( header.getProtocolId() ) {
+                case 0:
+                    // amqpTransport.sendToAmqp(new AmqpHeader());
+                    break; // nothing to do..
+                case 3: // Client will be using SASL for auth..
+                    sasl = protonTransport.sasl();
+                    sasl.setMechanisms(new String[]{"ANONYMOUS", "PLAIN"});
+                    sasl.server();
+                    break;
+                default:
+            }
+            frame = header.getBuffer();
+        } else {
+            frame = (Buffer)command;
         }
+        onFrame(frame);
+    }
 
-        try {
+    public void onFrame(Buffer frame) throws Exception {
+//        System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+        while( frame.length > 0 ) {
+            try {
+                int count = protonTransport.input(frame.data, frame.offset, frame.length);
+                frame.moveHead(count);
+            } catch (Throwable e) {
+                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+            }
+            try {
+
+                if( sasl!=null ) {
+                    // Lets try to complete the sasl handshake.
+                    if( sasl.getRemoteMechanisms().length > 0 ) {
+                        if( "PLAIN".equals(sasl.getRemoteMechanisms()[0]) ) {
+                            byte[] data = new byte[sasl.pending()];
+                            sasl.recv(data, 0, data.length);
+                            Buffer[] parts = new Buffer(data).split((byte) 0);
+                            if( parts.length > 0 ) {
+                                connectionInfo.setUserName(parts[0].utf8().toString());
+                            }
+                            if( parts.length > 1 ) {
+                                connectionInfo.setPassword(parts[1].utf8().toString());
+                            }
+                            // We can't really auth at this point since we don't know the client id yet.. :(
+                            sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                            amqpTransport.getWireFormat().magicRead = false;
+                            sasl = null;
+                        } else if( "ANONYMOUS".equals(sasl.getRemoteMechanisms()[0]) ) {
+                            sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                            amqpTransport.getWireFormat().magicRead = false;
+                            sasl = null;
+                        }
+                    }
+                }
 
-            // Handle the amqp open..
-            if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
-                onConnectionOpen();
-            }
+                // Handle the amqp open..
+                if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
+                    onConnectionOpen();
+                }
 
-            // Lets map amqp sessions to openwire sessions..
-            Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
-            while (session != null) {
+                // Lets map amqp sessions to openwire sessions..
+                Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+                while (session != null) {
 
-                onSessionOpen(session);
-                session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
-            }
+                    onSessionOpen(session);
+                    session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+                }
 
 
-            Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
-            while (link != null) {
-                onLinkOpen(link);
-                link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
-            }
+                Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+                while (link != null) {
+                    onLinkOpen(link);
+                    link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+                }
 
-            Delivery delivery = protonConnection.getWorkHead();
-            while (delivery != null) {
-                AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
-                if (listener != null) {
-                    listener.onDelivery(delivery);
+                Delivery delivery = protonConnection.getWorkHead();
+                while (delivery != null) {
+                    AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+                    if (listener != null) {
+                        listener.onDelivery(delivery);
+                    }
+                    delivery = delivery.getWorkNext();
                 }
-                delivery = delivery.getWorkNext();
-            }
 
-            link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
-            while (link != null) {
-                ((AmqpDeliveryListener)link.getContext()).onClose();
-                link.close();
-                link = link.next(ACTIVE_STATE, CLOSED_STATE);
-            }
+                link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+                while (link != null) {
+                    ((AmqpDeliveryListener)link.getContext()).onClose();
+                    link.close();
+                    link = link.next(ACTIVE_STATE, CLOSED_STATE);
+                }
 
-            link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
-            while (link != null) {
-                ((AmqpDeliveryListener)link.getContext()).drainCheck();
-                link = link.next(ACTIVE_STATE, CLOSED_STATE);
-            }
+                link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
+                while (link != null) {
+                    ((AmqpDeliveryListener)link.getContext()).drainCheck();
+                    link = link.next(ACTIVE_STATE, CLOSED_STATE);
+                }
 
 
-            session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
-            while (session != null) {
-                //TODO - close links?
-                onSessionClose(session);
-                session = session.next(ACTIVE_STATE, CLOSED_STATE);
-            }
-            if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
-                doClose();
+                session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+                while (session != null) {
+                    //TODO - close links?
+                    onSessionClose(session);
+                    session = session.next(ACTIVE_STATE, CLOSED_STATE);
+                }
+                if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
+                    doClose();
+                }
+
+            } catch (Throwable e) {
+                handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
             }
 
-        } catch (Throwable e) {
-            handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+            pumpProtonToSocket();
         }
-
-        pumpProtonToSocket();
     }
 
     boolean closing = false;

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java Fri Oct 26 14:39:46 2012
@@ -29,7 +29,7 @@ public interface AmqpTransport {
 
     public void sendToActiveMQ(Command command);
 
-    public void sendToAmqp(Buffer command) throws IOException;
+    public void sendToAmqp(Object command) throws IOException;
 
     public X509Certificate[] getPeerCertificates();
 

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Fri Oct 26 14:39:46 2012
@@ -93,13 +93,13 @@ public class AmqpTransportFilter extends
             }
             protocolConverter.lock.lock();
             try {
-                protocolConverter.onAMQPData((Buffer) command);
+                protocolConverter.onAMQPData(command);
             } finally {
                 protocolConverter.lock.unlock();
             }
         } catch (IOException e) {
             handleException(e);
-        } catch (JMSException e) {
+        } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
         }
     }
@@ -112,7 +112,7 @@ public class AmqpTransportFilter extends
         }
     }
 
-    public void sendToAmqp(Buffer command) throws IOException {
+    public void sendToAmqp(Object command) throws IOException {
         assert protocolConverter.lock.isHeldByCurrentThread();
         if (trace) {
             TRACE.trace("Sending: \n" + command);

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java Fri Oct 26 14:39:46 2012
@@ -57,7 +57,7 @@ public class AmqpWireFormat implements W
             Buffer magic = new Buffer(8);
             magic.readFrom(dataIn);
             magicRead = true;
-            return magic;
+            return new AmqpHeader(magic);
         } else {
             int size = dataIn.readInt();
             if( size > maxFrameLength ) {

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Fri Oct 26 14:39:46 2012
@@ -132,7 +132,7 @@ public class JMSClientTest extends AmqpT
 //    }
 
     private Connection createConnection() throws JMSException {
-        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
         final Connection connection = factory.createConnection();
         connection.setExceptionListener(new ExceptionListener() {
             @Override