You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Gary Tully <ga...@gmail.com> on 2011/03/22 00:37:15 UTC
Re: svn commit: r1083988 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp:
StompConnection.java StompFrame.java
was behind you by an hour with this fix, my bad, fix is the same
though, just a noun change :-)
On 21 March 2011 22:11, <ta...@apache.org> wrote:
> Author: tabish
> Date: Mon Mar 21 22:11:19 2011
> New Revision: 1083988
>
> URL: http://svn.apache.org/viewvc?rev=1083988&view=rev
> Log:
> fix test breakages caused by the initial fix for: https://issues.apache.org/jira/browse/AMQ-3231
>
> Modified:
> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
>
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1083988&r1=1083987&r2=1083988&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Mon Mar 21 22:11:19 2011
> @@ -32,13 +32,13 @@ public class StompConnection {
>
> private Socket stompSocket;
> private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
> -
> +
> public void open(String host, int port) throws IOException, UnknownHostException {
> open(new Socket(host, port));
> }
> -
> +
> public void open(Socket socket) {
> - stompSocket = socket;
> + stompSocket = socket;
> }
>
> public void close() throws IOException {
> @@ -55,23 +55,23 @@ public class StompConnection {
> outputStream.write(0);
> outputStream.flush();
> }
> -
> +
> public void sendFrame(String frame, byte[] data) throws Exception {
> byte[] bytes = frame.getBytes("UTF-8");
> OutputStream outputStream = stompSocket.getOutputStream();
> outputStream.write(bytes);
> outputStream.write(data);
> outputStream.write(0);
> - outputStream.flush();
> + outputStream.flush();
> }
> -
> +
> public StompFrame receive() throws Exception {
> return receive(RECEIVE_TIMEOUT);
> - }
> -
> + }
> +
> public StompFrame receive(long timeOut) throws Exception {
> - stompSocket.setSoTimeout((int)timeOut);
> - InputStream is = stompSocket.getInputStream();
> + stompSocket.setSoTimeout((int)timeOut);
> + InputStream is = stompSocket.getInputStream();
> StompWireFormat wf = new StompWireFormat();
> DataInputStream dis = new DataInputStream(is);
> return (StompFrame)wf.unmarshal(dis);
> @@ -104,143 +104,143 @@ public class StompConnection {
> }
> }
>
> - private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
> - byte[] ba = inputBuffer.toByteArray();
> + private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
> + byte[] ba = inputBuffer.toByteArray();
> inputBuffer.reset();
> return new String(ba, "UTF-8");
> }
>
> public Socket getStompSocket() {
> - return stompSocket;
> - }
> + return stompSocket;
> + }
> +
> + public void setStompSocket(Socket stompSocket) {
> + this.stompSocket = stompSocket;
> + }
>
> - public void setStompSocket(Socket stompSocket) {
> - this.stompSocket = stompSocket;
> - }
> -
> public void connect(String username, String password) throws Exception {
> connect(username, password, null);
> }
> -
> +
> public void connect(String username, String password, String client) throws Exception {
> - HashMap<String, String> headers = new HashMap();
> - headers.put("login", username);
> - headers.put("passcode", password);
> - if (client != null) {
> - headers.put("client-id", client);
> - }
> - StompFrame frame = new StompFrame("CONNECT", headers);
> - sendFrame(frame.toString());
> -
> + HashMap<String, String> headers = new HashMap<String, String>();
> + headers.put("login", username);
> + headers.put("passcode", password);
> + if (client != null) {
> + headers.put("client-id", client);
> + }
> + StompFrame frame = new StompFrame("CONNECT", headers);
> + sendFrame(frame.marshal());
> +
> StompFrame connect = receive();
> if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
> - throw new Exception ("Not connected: " + connect.getBody());
> + throw new Exception ("Not connected: " + connect.getBody());
> }
> }
> -
> +
> public void disconnect() throws Exception {
> - StompFrame frame = new StompFrame("DISCONNECT");
> - sendFrame(frame.toString());
> + StompFrame frame = new StompFrame("DISCONNECT");
> + sendFrame(frame.toString());
> }
> -
> +
> public void send(String destination, String message) throws Exception {
> - send(destination, message, null, null);
> + send(destination, message, null, null);
> }
> -
> +
> public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
> - if (headers == null) {
> - headers = new HashMap<String, String>();
> - }
> - headers.put("destination", destination);
> - if (transaction != null) {
> - headers.put("transaction", transaction);
> - }
> - StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
> - sendFrame(frame.toString());
> + if (headers == null) {
> + headers = new HashMap<String, String>();
> + }
> + headers.put("destination", destination);
> + if (transaction != null) {
> + headers.put("transaction", transaction);
> + }
> + StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
> + sendFrame(frame.toString());
> }
> -
> +
> public void subscribe(String destination) throws Exception {
> - subscribe(destination, null, null);
> + subscribe(destination, null, null);
> }
> -
> +
> public void subscribe(String destination, String ack) throws Exception {
> - subscribe(destination, ack, new HashMap<String, String>());
> + subscribe(destination, ack, new HashMap<String, String>());
> }
> -
> +
> public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
> - if (headers == null) {
> - headers = new HashMap<String, String>();
> - }
> - headers.put("destination", destination);
> - if (ack != null) {
> - headers.put("ack", ack);
> - }
> - StompFrame frame = new StompFrame("SUBSCRIBE", headers);
> - sendFrame(frame.toString());
> + if (headers == null) {
> + headers = new HashMap<String, String>();
> + }
> + headers.put("destination", destination);
> + if (ack != null) {
> + headers.put("ack", ack);
> + }
> + StompFrame frame = new StompFrame("SUBSCRIBE", headers);
> + sendFrame(frame.toString());
> }
> -
> +
> public void unsubscribe(String destination) throws Exception {
> - unsubscribe(destination, null);
> + unsubscribe(destination, null);
> }
> -
> +
> public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
> - if (headers == null) {
> - headers = new HashMap<String, String>();
> - }
> - headers.put("destination", destination);
> - StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
> - sendFrame(frame.toString());
> - }
> -
> + if (headers == null) {
> + headers = new HashMap<String, String>();
> + }
> + headers.put("destination", destination);
> + StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
> + sendFrame(frame.toString());
> + }
> +
> public void begin(String transaction) throws Exception {
> - HashMap<String, String> headers = new HashMap<String, String>();
> - headers.put("transaction", transaction);
> - StompFrame frame = new StompFrame("BEGIN", headers);
> - sendFrame(frame.toString());
> + HashMap<String, String> headers = new HashMap<String, String>();
> + headers.put("transaction", transaction);
> + StompFrame frame = new StompFrame("BEGIN", headers);
> + sendFrame(frame.toString());
> }
> -
> +
> public void abort(String transaction) throws Exception {
> - HashMap<String, String> headers = new HashMap<String, String>();
> - headers.put("transaction", transaction);
> - StompFrame frame = new StompFrame("ABORT", headers);
> - sendFrame(frame.toString());
> + HashMap<String, String> headers = new HashMap<String, String>();
> + headers.put("transaction", transaction);
> + StompFrame frame = new StompFrame("ABORT", headers);
> + sendFrame(frame.toString());
> }
> -
> +
> public void commit(String transaction) throws Exception {
> - HashMap<String, String> headers = new HashMap<String, String>();
> - headers.put("transaction", transaction);
> - StompFrame frame = new StompFrame("COMMIT", headers);
> - sendFrame(frame.toString());
> + HashMap<String, String> headers = new HashMap<String, String>();
> + headers.put("transaction", transaction);
> + StompFrame frame = new StompFrame("COMMIT", headers);
> + sendFrame(frame.toString());
> }
> -
> +
> public void ack(StompFrame frame) throws Exception {
> - ack(frame.getHeaders().get("message-id"), null);
> - }
> -
> + ack(frame.getHeaders().get("message-id"), null);
> + }
> +
> public void ack(StompFrame frame, String transaction) throws Exception {
> - ack(frame.getHeaders().get("message-id"), transaction);
> + ack(frame.getHeaders().get("message-id"), transaction);
> }
> -
> +
> public void ack(String messageId) throws Exception {
> - ack(messageId, null);
> + ack(messageId, null);
> }
> -
> +
> public void ack(String messageId, String transaction) throws Exception {
> - HashMap<String, String> headers = new HashMap<String, String>();
> - headers.put("message-id", messageId);
> - if (transaction != null)
> - headers.put("transaction", transaction);
> - StompFrame frame = new StompFrame("ACK", headers);
> - sendFrame(frame.toString());
> + HashMap<String, String> headers = new HashMap<String, String>();
> + headers.put("message-id", messageId);
> + if (transaction != null)
> + headers.put("transaction", transaction);
> + StompFrame frame = new StompFrame("ACK", headers);
> + sendFrame(frame.toString());
> }
> -
> +
> protected String appendHeaders(HashMap<String, Object> headers) {
> - StringBuffer result = new StringBuffer();
> - for (String key : headers.keySet()) {
> - result.append(key + ":" + headers.get(key) + "\n");
> - }
> - result.append("\n");
> - return result.toString();
> + StringBuffer result = new StringBuffer();
> + for (String key : headers.keySet()) {
> + result.append(key + ":" + headers.get(key) + "\n");
> + }
> + result.append("\n");
> + return result.toString();
> }
>
> }
>
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=1083988&r1=1083987&r2=1083988&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Mon Mar 21 22:11:19 2011
> @@ -18,7 +18,6 @@ package org.apache.activemq.transport.st
>
> import java.io.UnsupportedEncodingException;
> import java.util.Arrays;
> -import java.util.Collections;
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.Map;
> @@ -30,7 +29,7 @@ import org.apache.activemq.state.Command
>
> /**
> * Represents all the data in a STOMP frame.
> - *
> + *
> * @author <a href="http://hiramchirino.com">chirino</a>
> */
> public class StompFrame implements Command {
> @@ -42,21 +41,21 @@ public class StompFrame implements Comma
> private byte[] content = NO_DATA;
>
> public StompFrame(String command) {
> - this(command, null, null);
> + this(command, null, null);
> }
> -
> +
> public StompFrame(String command, Map<String, String> headers) {
> - this(command, headers, null);
> - }
> -
> + this(command, headers, null);
> + }
> +
> public StompFrame(String command, Map<String, String> headers, byte[] data) {
> this.action = command;
> if (headers != null)
> - this.headers = headers;
> + this.headers = headers;
> if (data != null)
> - this.content = data;
> + this.content = data;
> }
> -
> +
> public StompFrame() {
> }
>
> @@ -71,13 +70,13 @@ public class StompFrame implements Comma
> public byte[] getContent() {
> return content;
> }
> -
> +
> public String getBody() {
> - try {
> - return new String(content, "UTF-8");
> - } catch (UnsupportedEncodingException e) {
> - return new String(content);
> - }
> + try {
> + return new String(content, "UTF-8");
> + } catch (UnsupportedEncodingException e) {
> + return new String(content);
> + }
> }
>
> public void setContent(byte[] data) {
> @@ -138,7 +137,7 @@ public class StompFrame implements Comma
> public boolean isShutdownInfo() {
> return false;
> }
> -
> +
> public boolean isConnectionControl() {
> return false;
> }
> @@ -171,16 +170,24 @@ public class StompFrame implements Comma
> return false;
> }
>
> + public String marshal() {
> + return toString(false);
> + }
> +
> public String toString() {
> + return toString(true);
> + }
> +
> + private String toString(boolean hidePasscode) {
> StringBuffer buffer = new StringBuffer();
> buffer.append(getAction());
> buffer.append("\n");
> - Map headers = getHeaders();
> - for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
> - Map.Entry entry = (Map.Entry)iter.next();
> + Map<String, String> headers = getHeaders();
> + for (Iterator<Map.Entry<String,String>> iter = headers.entrySet().iterator(); iter.hasNext();) {
> + Map.Entry<String, String> entry = (Map.Entry<String,String>)iter.next();
> buffer.append(entry.getKey());
> buffer.append(":");
> - if (entry.getKey().toString().toLowerCase().contains(Stomp.Headers.Connect.PASSCODE)) {
> + if (hidePasscode && entry.getKey().toString().toLowerCase().contains(Stomp.Headers.Connect.PASSCODE)) {
> buffer.append("*****");
> } else {
> buffer.append(entry.getValue());
>
>
>
--
http://blog.garytully.com
http://fusesource.com