You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/02 17:00:14 UTC
svn commit: r418602 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/
Author: chirino
Date: Sun Jul 2 08:00:13 2006
New Revision: 418602
URL: http://svn.apache.org/viewvc?rev=418602&view=rev
Log:
Better protocol error handling.
Fixed http://issues.apache.org/activemq/browse/AMQ-649
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java
- copied, changed from r418550, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java
Removed:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java?rev=418602&r1=418601&r2=418602&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java Sun Jul 2 08:00:13 2006
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
-import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -87,13 +86,13 @@
}
}
- protected ResponseHandler createResponseHandler(StompCommand command){
+ protected ResponseHandler createResponseHandler(StompFrame command){
final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
// A response may not be needed.
if( receiptId != null ) {
return new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
- StompCommand sc = new StompCommand();
+ StompFrame sc = new StompFrame();
sc.setHeaders(new HashMap(5));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
transportFilter.sendToStomp(sc);
@@ -112,7 +111,7 @@
transportFilter.sendToActiveMQ(command);
}
- protected void sendToStomp(StompCommand command) throws IOException {
+ protected void sendToStomp(StompFrame command) throws IOException {
transportFilter.sendToStomp(command);
}
@@ -120,9 +119,13 @@
* Convert a stomp command
* @param command
*/
- public void onStompCommad( StompCommand command ) throws IOException, JMSException {
+ public void onStompCommad( StompFrame command ) throws IOException, JMSException {
try {
+ if( command.getClass() == StompFrameError.class ) {
+ throw ((StompFrameError)command).getException();
+ }
+
String action = command.getAction();
if (action.startsWith(Stomp.Commands.SEND))
onStompSend(command);
@@ -161,13 +164,15 @@
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
- StompCommand errorMessage = new StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray());
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
sendToStomp(errorMessage);
+ if( e.isFatal() )
+ getTransportFilter().onException(e);
}
}
- protected void onStompSend(StompCommand command) throws IOException, JMSException {
+ protected void onStompSend(StompFrame command) throws IOException, JMSException {
checkConnected();
Map headers = command.getHeaders();
@@ -193,7 +198,7 @@
}
- protected void onStompAck(StompCommand command) throws ProtocolException {
+ protected void onStompAck(StompFrame command) throws ProtocolException {
checkConnected();
// TODO: acking with just a message id is very bogus
@@ -231,7 +236,7 @@
}
- protected void onStompBegin(StompCommand command) throws ProtocolException {
+ protected void onStompBegin(StompFrame command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
@@ -258,7 +263,7 @@
}
- protected void onStompCommit(StompCommand command) throws ProtocolException {
+ protected void onStompCommit(StompFrame command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
@@ -283,7 +288,7 @@
sendToActiveMQ(tx, createResponseHandler(command));
}
- protected void onStompAbort(StompCommand command) throws ProtocolException {
+ protected void onStompAbort(StompFrame command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
@@ -308,7 +313,7 @@
}
- protected void onStompSubscribe(StompCommand command) throws ProtocolException {
+ protected void onStompSubscribe(StompFrame command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
@@ -343,7 +348,7 @@
}
- protected void onStompUnsubscribe(StompCommand command) throws ProtocolException {
+ protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
@@ -375,7 +380,7 @@
throw new ProtocolException("No subscription matched.");
}
- protected void onStompConnect(StompCommand command) throws ProtocolException {
+ protected void onStompConnect(StompFrame command) throws ProtocolException {
if(connected.get()) {
throw new ProtocolException("Allready connected.");
@@ -422,7 +427,7 @@
responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
}
- StompCommand sc = new StompCommand();
+ StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.CONNECTED);
sc.setHeaders(responseHeaders);
sendToStomp(sc);
@@ -434,7 +439,7 @@
}
- protected void onStompDisconnect(StompCommand command) throws ProtocolException {
+ protected void onStompDisconnect(StompFrame command) throws ProtocolException {
checkConnected();
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
@@ -473,7 +478,7 @@
}
- public ActiveMQMessage convertMessage(StompCommand command) throws IOException, JMSException {
+ public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
Map headers = command.getHeaders();
// now the body
@@ -488,7 +493,7 @@
try {
text.setText(new String(command.getContent(), "UTF-8"));
} catch (Throwable e) {
- throw (ProtocolException)new ProtocolException("Text could not bet set: "+e).initCause(e);
+ throw new ProtocolException("Text could not bet set: "+e, false, e);
}
msg = text;
}
@@ -530,9 +535,9 @@
return msg;
}
- public StompCommand convertMessage(ActiveMQMessage message) throws IOException, JMSException {
+ public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
- StompCommand command = new StompCommand();
+ StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
HashMap headers = new HashMap();
@@ -620,8 +625,4 @@
this.transportFilter = transportFilter;
}
- public void onStompExcepton(IOException error) {
- // TODO Auto-generated method stub
- }
-
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java?rev=418602&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolException.java Sun Jul 2 08:00:13 2006
@@ -0,0 +1,50 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolException extends IOException {
+
+ private static final long serialVersionUID = -2869735532997332242L;
+
+ private final boolean fatal;
+
+ public ProtocolException() {
+ this(null);
+ }
+ public ProtocolException(String s) {
+ this(s, false);
+ }
+ public ProtocolException(String s, boolean fatal) {
+ this(s,fatal, null);
+ }
+ public ProtocolException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+
+}
Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java (from r418550, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java&r1=418550&r2=418602&rev=418602&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrame.java Sun Jul 2 08:00:13 2006
@@ -30,7 +30,7 @@
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-public class StompCommand implements Command {
+public class StompFrame implements Command {
private static final byte[] NO_DATA = new byte[]{};
@@ -38,13 +38,13 @@
private Map headers = Collections.EMPTY_MAP;
private byte[] content = NO_DATA;
- public StompCommand(String command, HashMap headers, byte[] data) {
+ public StompFrame(String command, HashMap headers, byte[] data) {
this.action = command;
this.headers = headers;
this.content = data;
}
- public StompCommand() {
+ public StompFrame() {
}
public String getAction() {
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java?rev=418602&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompFrameError.java Sun Jul 2 08:00:13 2006
@@ -0,0 +1,38 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrameError extends StompFrame {
+
+
+ private final ProtocolException exception;
+
+ public StompFrameError(ProtocolException exception) {
+ this.exception = exception;
+ }
+
+ public ProtocolException getException() {
+ return exception;
+ }
+
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java?rev=418602&r1=418601&r2=418602&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java Sun Jul 2 08:00:13 2006
@@ -69,7 +69,7 @@
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
}
- StompCommand command = protocolConverter.convertMessage(message);
+ StompFrame command = protocolConverter.convertMessage(message);
command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId!=null) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java?rev=418602&r1=418601&r2=418602&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java Sun Jul 2 08:00:13 2006
@@ -46,14 +46,6 @@
protocolConverter.setTransportFilter(this);
}
- public void start() throws Exception {
- super.start();
- }
-
- public void stop() throws Exception {
- super.stop();
- }
-
public void oneway(Command command) throws IOException {
try {
protocolConverter.onActiveMQCommad(command);
@@ -64,7 +56,7 @@
public void onCommand(Command command) {
try {
- protocolConverter.onStompCommad((StompCommand) command);
+ protocolConverter.onStompCommad((StompFrame) command);
} catch (IOException e) {
onException(e);
} catch (JMSException e) {
@@ -72,24 +64,16 @@
}
}
- public void onException(IOException error) {
- protocolConverter.onStompExcepton(error);
- transportListener.onException(error);
- }
-
-
public void sendToActiveMQ(Command command) {
synchronized(sendToActiveMQMutex) {
transportListener.onCommand(command);
}
}
- public void sendToStomp(StompCommand command) throws IOException {
+ public void sendToStomp(StompFrame command) throws IOException {
synchronized(sendToStompMutex) {
next.oneway(command);
}
}
-
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java?rev=418602&r1=418601&r2=418602&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java Sun Jul 2 08:00:13 2006
@@ -19,7 +19,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -62,7 +61,7 @@
}
public void marshal(Object command, DataOutputStream os) throws IOException {
- StompCommand stomp = (org.apache.activemq.transport.stomp2.StompCommand) command;
+ StompFrame stomp = (org.apache.activemq.transport.stomp2.StompFrame) command;
StringBuffer buffer = new StringBuffer();
buffer.append(stomp.getAction());
@@ -88,92 +87,97 @@
public Object unmarshal(DataInputStream in) throws IOException {
- String action = null;
-
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- } else {
- action = action.trim();
- if (action.length() > 0) {
- break;
+ try {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ } else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
}
}
- }
-
- // Parse the headers
- HashMap headers = new HashMap(25);
- while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line != null && line.trim().length() > 0) {
-
- if( headers.size() > MAX_HEADERS )
- throw new ProtocolException("The maximum number of headers was exceeded");
-
- try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e) {
- throw new ProtocolException("Unable to parser header line [" + line + "]");
- }
- }
- else {
- break;
- }
- }
-
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength!=null) {
-
- // Bless the client, he's telling us how much data to read in.
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- } catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a valid integer");
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if( headers.size() > MAX_HEADERS )
+ throw new ProtocolException("The maximum number of headers was exceeded", true);
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+ }
+ }
+ else {
+ break;
+ }
}
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength!=null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ } catch (NumberFormatException e) {
+ throw new ProtocolException("Specified content-length is not a valid integer", true);
+ }
- if( length > MAX_DATA_LENGTH )
- throw new ProtocolException("The maximum data length was exceeded");
+ if( length > MAX_DATA_LENGTH )
+ throw new ProtocolException("The maximum data length was exceeded", true);
+
+ data = new byte[length];
+ in.readFully(data);
+
+ if (in.readByte() != 0) {
+ throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true);
+ }
+
+ } else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos=null;
+ while ((b = in.readByte()) != 0) {
+
+ if( baos == null ) {
+ baos = new ByteArrayOutputStream();
+ } else if( baos.size() > MAX_DATA_LENGTH ) {
+ throw new ProtocolException("The maximum data length was exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if( baos!=null ) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+
+ }
+
+ return new StompFrame(action, headers, data);
- data = new byte[length];
- in.readFully(data);
-
- if (in.readByte() != 0) {
- throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte");
- }
-
- } else {
-
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos=null;
- while ((b = in.readByte()) != 0) {
-
- if( baos == null ) {
- baos = new ByteArrayOutputStream();
- } else if( baos.size() > MAX_DATA_LENGTH ) {
- throw new ProtocolException("The maximum data length was exceeded");
- }
-
- baos.write(b);
- }
-
- if( baos!=null ) {
- baos.close();
- data = baos.toByteArray();
- }
-
- }
-
- return new StompCommand(action, headers, data);
+ } catch (ProtocolException e) {
+ return new StompFrameError(e);
+ }
}
@@ -182,7 +186,7 @@
ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
if( baos.size() > maxLength )
- throw new ProtocolException(errorMessage);
+ throw new ProtocolException(errorMessage, true);
baos.write(b);
}
ByteSequence sequence = baos.toByteSequence();