You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/12/20 15:37:43 UTC
svn commit: r605944 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/transport/stomp/
Author: chirino
Date: Thu Dec 20 06:37:41 2007
New Revision: 605944
URL: http://svn.apache.org/viewvc?rev=605944&view=rev
Log:
Applied Dejan's patch on AMQ-1272 with some small tweaks.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=605944&r1=605943&r2=605944&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Dec 20 06:37:41 2007
@@ -25,7 +25,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
@@ -33,11 +32,13 @@
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -51,6 +52,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
@@ -94,17 +96,23 @@
}
}
- protected ResponseHandler createResponseHandler(StompFrame command) {
+ protected ResponseHandler createResponseHandler(final StompFrame command) {
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- // A response may not be needed.
if (receiptId != null) {
return new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
- StompFrame sc = new StompFrame();
- sc.setAction(Stomp.Responses.RECEIPT);
- sc.setHeaders(new HashMap<String, String>(1));
- sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- transportFilter.sendToStomp(sc);
+ if (response.isException()) {
+ // Generally a command can fail.. but that does not invalidate the connection.
+ // We report back the failure but we don't close the connection.
+ Throwable exception = ((ExceptionResponse)response).getException();
+ handleException(exception, command);
+ } else {
+ StompFrame sc = new StompFrame();
+ sc.setAction(Stomp.Responses.RECEIPT);
+ sc.setHeaders(new HashMap<String, String>(1));
+ sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ transportFilter.sendToStomp(sc);
+ }
}
};
}
@@ -160,28 +168,33 @@
}
} catch (ProtocolException e) {
-
- // Let the stomp client know about any protocol errors.
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
- e.printStackTrace(stream);
- stream.close();
-
- HashMap<String, String> headers = new HashMap<String, String>();
- headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
- final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null) {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ handleException(e, command);
+ // Some protocol errors can cause the connection to get closed.
+ if( e.isFatal() ) {
+ getTransportFilter().onException(e);
}
+ }
+ }
+
+ protected void handleException(Throwable exception, StompFrame command) throws IOException {
+ // Let the stomp client know about any protocol errors.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ exception.printStackTrace(stream);
+ stream.close();
- StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
- sendToStomp(errorMessage);
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
- if (e.isFatal()) {
- getTransportFilter().onException(e);
- }
+ if (command != null) {
+ final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null) {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
}
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ sendToStomp(errorMessage);
}
protected void onStompSend(StompFrame command) throws IOException, JMSException {
@@ -393,7 +406,7 @@
throw new ProtocolException("No subscription matched.");
}
- protected void onStompConnect(StompFrame command) throws ProtocolException {
+ protected void onStompConnect(final StompFrame command) throws ProtocolException {
if (connected.get()) {
throw new ProtocolException("Allready connected.");
@@ -424,13 +437,28 @@
sendToActiveMQ(connectionInfo, new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+ if (response.isException()) {
+ // If the connection attempt fails we close the socket.
+ Throwable exception = ((ExceptionResponse)response).getException();
+ handleException(exception, command);
+ getTransportFilter().onException(IOExceptionSupport.create(exception));
+ return;
+ }
+
final SessionInfo sessionInfo = new SessionInfo(sessionId);
sendToActiveMQ(sessionInfo, null);
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-
+
+ if (response.isException()) {
+ // If the connection attempt fails we close the socket.
+ Throwable exception = ((ExceptionResponse)response).getException();
+ handleException(exception, command);
+ getTransportFilter().onException(IOExceptionSupport.create(exception));
+ }
+
connected.set(true);
HashMap<String, String> responseHeaders = new HashMap<String, String>();
@@ -483,8 +511,13 @@
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if (rh != null) {
rh.onResponse(this, response);
+ } else {
+ // Pass down any unexpected errors. Should this close the connection?
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse)response).getException();
+ handleException(exception, null);
+ }
}
-
} else if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command;
@@ -492,6 +525,10 @@
if (sub != null) {
sub.onMessageDispatch(md);
}
+ } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+ // Pass down any unexpected async errors. Should this close the connection?
+ Throwable exception = ((ConnectionError)command).getException();
+ handleException(exception, null);
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=605944&r1=605943&r2=605944&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java Thu Dec 20 06:37:41 2007
@@ -30,9 +30,13 @@
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
-
+
public void open(String host, int port) throws IOException, UnknownHostException {
- stompSocket = new Socket(host, port);
+ open(new Socket(host, port));
+ }
+
+ public void open(Socket socket) {
+ stompSocket = socket;
}
public void close() throws IOException {
@@ -75,5 +79,13 @@
}
}
}
+
+ public Socket getStompSocket() {
+ return stompSocket;
+ }
+
+ public void setStompSocket(Socket stompSocket) {
+ this.stompSocket = stompSocket;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=605944&r1=605943&r2=605944&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Thu Dec 20 06:37:41 2007
@@ -35,49 +35,49 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.reliable.UnreliableUdpTransportTest;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.SimpleSecurityBrokerSystemTest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class StompTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(StompTest.class);
- protected String bindAddress = "stomp://localhost:0";
+ protected String bindAddress = "stomp://localhost:61613";
+ protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
private BrokerService broker;
- private TransportConnector connector;
private StompConnection stompConnection = new StompConnection();
private Connection connection;
private Session session;
private ActiveMQQueue queue;
-
+
protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
-
- connector = broker.addConnector(bindAddress);
+ broker = BrokerFactory.createBroker(new URI(confUri));
broker.start();
stompConnect();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- connection = cf.createConnection();
+ connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getQueueName());
connection.start();
}
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
- URI connectUri = connector.getConnectUri();
- stompConnection.open("127.0.0.1", connectUri.getPort());
+ URI connectUri = new URI(bindAddress);
+ stompConnection.open(createSocket(connectUri));
}
protected Socket createSocket(URI connectUri) throws IOException {
- return new Socket();
+ return new Socket("127.0.0.1", connectUri.getPort());
}
protected String getQueueName() {
@@ -117,7 +117,7 @@
public void testConnect() throws Exception {
- String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+ String connectFrame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
@@ -130,7 +130,7 @@
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -155,7 +155,7 @@
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -174,7 +174,7 @@
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -195,7 +195,7 @@
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -222,7 +222,7 @@
public void testSubscribeWithAutoAck() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -242,7 +242,7 @@
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -271,7 +271,7 @@
public void testSubscribeWithMessageSentWithProperties() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -305,7 +305,7 @@
int ctr = 10;
String[] data = new String[ctr];
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -343,7 +343,7 @@
public void testSubscribeWithAutoAckAndSelector() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -365,7 +365,7 @@
public void testSubscribeWithClientAck() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -389,7 +389,7 @@
public void testUnsubscribe() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
@@ -426,7 +426,7 @@
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
@@ -450,7 +450,7 @@
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
@@ -486,7 +486,7 @@
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
assertClients(1);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
@@ -502,7 +502,61 @@
assertClients(1);
}
+
+ public void testConnectNotAuthenticatedWrongUser() throws Exception {
+ String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String f = stompConnection.receiveFrame();
+
+ assertTrue(f.startsWith("ERROR"));
+ assertClients(1);
+
+ }
+
+ public void testConnectNotAuthenticatedWrongPassword() throws Exception {
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String f = stompConnection.receiveFrame();
+
+ assertTrue(f.startsWith("ERROR"));
+ assertClients(1);
+ }
+
+ public void testSendNotAuthorized() throws Exception {
+
+ String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("ERROR"));
+
+ }
+
+ public void testSubscribeNotAuthorized() throws Exception {
+
+ String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("ERROR"));
+ }
+
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length;