You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/26 16:39:47 UTC
svn commit: r1402532 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/
test/java/org/apache/activemq/transport/amqp/
Author: chirino
Date: Fri Oct 26 14:39:46 2012
New Revision: 1402532
URL: http://svn.apache.org/viewvc?rev=1402532&view=rev
Log:
PLAIN Sasl is now working.
Added:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
Added: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java?rev=1402532&view=auto
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java (added)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java Fri Oct 26 14:39:46 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+ static final Buffer PREFIX = new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P'
+ });
+
+ private Buffer buffer;
+
+ public AmqpHeader(){
+ this(new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P', 0, 1, 0, 0
+ }));
+ }
+
+ public AmqpHeader(Buffer buffer){
+ setBuffer(buffer);
+ }
+
+ public int getProtocolId() {
+ return buffer.get(4) & 0xFF;
+ }
+ public void setProtocolId(int value) {
+ buffer.data[buffer.offset+4] = (byte) value;
+ }
+
+ public int getMajor() {
+ return buffer.get(5) & 0xFF;
+ }
+ public void setMajor(int value) {
+ buffer.data[buffer.offset+5] = (byte) value;
+ }
+
+ public int getMinor() {
+ return buffer.get(6) & 0xFF;
+ }
+ public void setMinor(int value) {
+ buffer.data[buffer.offset+6] = (byte) value;
+ }
+
+ public int getRevision() {
+ return buffer.get(7) & 0xFF;
+ }
+ public void setRevision(int value) {
+ buffer.data[buffer.offset+7] = (byte) value;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+ public void setBuffer(Buffer value) {
+ if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+ throw new IllegalArgumentException("Not an AMQP header buffer");
+ }
+ buffer = value.buffer();
+ }
+
+
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
+}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Fri Oct 26 14:39:46 2012
@@ -22,12 +22,8 @@ import org.apache.activemq.transport.amq
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.qpid.proton.codec.Decoder;
-import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.engine.*;
-import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.engine.impl.*;
import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.DescribedType;
@@ -45,7 +41,6 @@ import org.fusesource.hawtbuf.ByteArrayO
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
@@ -141,79 +136,128 @@ class AmqpProtocolConverter {
}
}
+ Sasl sasl;
+
/**
* Convert a AMQP command
*/
- public void onAMQPData(Buffer frame) throws IOException, JMSException {
-
-
- try {
-// System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
- protonTransport.input(frame.data, frame.offset, frame.length);
- } catch (Throwable e) {
- handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+ public void onAMQPData(Object command) throws Exception {
+ Buffer frame;
+ if( command.getClass() == AmqpHeader.class ) {
+ AmqpHeader header = (AmqpHeader)command;
+ switch( header.getProtocolId() ) {
+ case 0:
+ // amqpTransport.sendToAmqp(new AmqpHeader());
+ break; // nothing to do..
+ case 3: // Client will be using SASL for auth..
+ sasl = protonTransport.sasl();
+ sasl.setMechanisms(new String[]{"ANONYMOUS", "PLAIN"});
+ sasl.server();
+ break;
+ default:
+ }
+ frame = header.getBuffer();
+ } else {
+ frame = (Buffer)command;
}
+ onFrame(frame);
+ }
- try {
+ public void onFrame(Buffer frame) throws Exception {
+// System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+ while( frame.length > 0 ) {
+ try {
+ int count = protonTransport.input(frame.data, frame.offset, frame.length);
+ frame.moveHead(count);
+ } catch (Throwable e) {
+ handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+ }
+ try {
+
+ if( sasl!=null ) {
+ // Lets try to complete the sasl handshake.
+ if( sasl.getRemoteMechanisms().length > 0 ) {
+ if( "PLAIN".equals(sasl.getRemoteMechanisms()[0]) ) {
+ byte[] data = new byte[sasl.pending()];
+ sasl.recv(data, 0, data.length);
+ Buffer[] parts = new Buffer(data).split((byte) 0);
+ if( parts.length > 0 ) {
+ connectionInfo.setUserName(parts[0].utf8().toString());
+ }
+ if( parts.length > 1 ) {
+ connectionInfo.setPassword(parts[1].utf8().toString());
+ }
+ // We can't really auth at this point since we don't know the client id yet.. :(
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ amqpTransport.getWireFormat().magicRead = false;
+ sasl = null;
+ } else if( "ANONYMOUS".equals(sasl.getRemoteMechanisms()[0]) ) {
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ amqpTransport.getWireFormat().magicRead = false;
+ sasl = null;
+ }
+ }
+ }
- // Handle the amqp open..
- if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
- onConnectionOpen();
- }
+ // Handle the amqp open..
+ if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) {
+ onConnectionOpen();
+ }
- // Lets map amqp sessions to openwire sessions..
- Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
- while (session != null) {
+ // Lets map amqp sessions to openwire sessions..
+ Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while (session != null) {
- onSessionOpen(session);
- session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
- }
+ onSessionOpen(session);
+ session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
- Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
- while (link != null) {
- onLinkOpen(link);
- link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
- }
+ Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while (link != null) {
+ onLinkOpen(link);
+ link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
- Delivery delivery = protonConnection.getWorkHead();
- while (delivery != null) {
- AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
- if (listener != null) {
- listener.onDelivery(delivery);
+ Delivery delivery = protonConnection.getWorkHead();
+ while (delivery != null) {
+ AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
+ if (listener != null) {
+ listener.onDelivery(delivery);
+ }
+ delivery = delivery.getWorkNext();
}
- delivery = delivery.getWorkNext();
- }
- link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
- while (link != null) {
- ((AmqpDeliveryListener)link.getContext()).onClose();
- link.close();
- link = link.next(ACTIVE_STATE, CLOSED_STATE);
- }
+ link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+ while (link != null) {
+ ((AmqpDeliveryListener)link.getContext()).onClose();
+ link.close();
+ link = link.next(ACTIVE_STATE, CLOSED_STATE);
+ }
- link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
- while (link != null) {
- ((AmqpDeliveryListener)link.getContext()).drainCheck();
- link = link.next(ACTIVE_STATE, CLOSED_STATE);
- }
+ link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
+ while (link != null) {
+ ((AmqpDeliveryListener)link.getContext()).drainCheck();
+ link = link.next(ACTIVE_STATE, CLOSED_STATE);
+ }
- session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
- while (session != null) {
- //TODO - close links?
- onSessionClose(session);
- session = session.next(ACTIVE_STATE, CLOSED_STATE);
- }
- if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
- doClose();
+ session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+ while (session != null) {
+ //TODO - close links?
+ onSessionClose(session);
+ session = session.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+ if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
+ doClose();
+ }
+
+ } catch (Throwable e) {
+ handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
}
- } catch (Throwable e) {
- handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+ pumpProtonToSocket();
}
-
- pumpProtonToSocket();
}
boolean closing = false;
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java Fri Oct 26 14:39:46 2012
@@ -29,7 +29,7 @@ public interface AmqpTransport {
public void sendToActiveMQ(Command command);
- public void sendToAmqp(Buffer command) throws IOException;
+ public void sendToAmqp(Object command) throws IOException;
public X509Certificate[] getPeerCertificates();
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Fri Oct 26 14:39:46 2012
@@ -93,13 +93,13 @@ public class AmqpTransportFilter extends
}
protocolConverter.lock.lock();
try {
- protocolConverter.onAMQPData((Buffer) command);
+ protocolConverter.onAMQPData(command);
} finally {
protocolConverter.lock.unlock();
}
} catch (IOException e) {
handleException(e);
- } catch (JMSException e) {
+ } catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
@@ -112,7 +112,7 @@ public class AmqpTransportFilter extends
}
}
- public void sendToAmqp(Buffer command) throws IOException {
+ public void sendToAmqp(Object command) throws IOException {
assert protocolConverter.lock.isHeldByCurrentThread();
if (trace) {
TRACE.trace("Sending: \n" + command);
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java Fri Oct 26 14:39:46 2012
@@ -57,7 +57,7 @@ public class AmqpWireFormat implements W
Buffer magic = new Buffer(8);
magic.readFrom(dataIn);
magicRead = true;
- return magic;
+ return new AmqpHeader(magic);
} else {
int size = dataIn.readInt();
if( size > maxFrameLength ) {
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1402532&r1=1402531&r2=1402532&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Fri Oct 26 14:39:46 2012
@@ -132,7 +132,7 @@ public class JMSClientTest extends AmqpT
// }
private Connection createConnection() throws JMSException {
- final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
+ final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override