You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/12 22:29:31 UTC
svn commit: r1157238 [2/3] - in /activemq/trunk: activemq-core/
activemq-core/src/main/filtered-resources/
activemq-core/src/main/filtered-resources/org/
activemq-core/src/main/filtered-resources/org/apache/
activemq-core/src/main/filtered-resources/or...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Fri Aug 12 20:29:29 2011
@@ -16,10 +16,16 @@
*/
package org.apache.activemq.transport.stomp;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,6 +40,7 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
@@ -62,7 +69,6 @@ import org.apache.activemq.util.Introspe
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContextAware;
/**
* @author <a href="http://hiramchirino.com">chirino</a>
@@ -70,9 +76,29 @@ import org.springframework.context.Appli
public class ProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
-
+
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+ private static final String BROKER_VERSION;
+ private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
+
+ private static final long DEFAULT_OUTBOUND_HEARTBEAT = 100;
+ private static final long DEFAULT_INBOUND_HEARTBEAT = 1000;
+ private static final long DEFAULT_INITIAL_HEARTBEAT_DELAY = 1000;
+
+ static {
+ InputStream in = null;
+ String version = "5.6.0";
+ if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ try {
+ version = reader.readLine();
+ } catch(Exception e) {
+ }
+ }
+ BROKER_VERSION = version;
+ }
+
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
@@ -84,6 +110,7 @@ public class ProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
+ private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
@@ -92,13 +119,15 @@ public class ProtocolConverter {
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
- private final FrameTranslator frameTranslator;
+ private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
private final BrokerContext brokerContext;
+ private String version = "1.0";
+ private long hbReadInterval = DEFAULT_INBOUND_HEARTBEAT;
+ private long hbWriteInterval = DEFAULT_OUTBOUND_HEARTBEAT;
- public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) {
+ public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
- this.frameTranslator = translator;
this.brokerContext = brokerContext;
}
@@ -178,6 +207,8 @@ public class ProtocolConverter {
onStompSend(command);
} else if (action.startsWith(Stomp.Commands.ACK)) {
onStompAck(command);
+ } else if (action.startsWith(Stomp.Commands.NACK)) {
+ onStompNack(command);
} else if (action.startsWith(Stomp.Commands.BEGIN)) {
onStompBegin(command);
} else if (action.startsWith(Stomp.Commands.COMMIT)) {
@@ -188,7 +219,8 @@ public class ProtocolConverter {
onStompSubscribe(command);
} else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
onStompUnsubscribe(command);
- } else if (action.startsWith(Stomp.Commands.CONNECT)) {
+ } else if (action.startsWith(Stomp.Commands.CONNECT) ||
+ action.startsWith(Stomp.Commands.STOMP)) {
onStompConnect(command);
} else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
onStompDisconnect(command);
@@ -199,7 +231,7 @@ public class ProtocolConverter {
} catch (ProtocolException e) {
handleException(e, command);
// Some protocol errors can cause the connection to get closed.
- if( e.isFatal() ) {
+ if (e.isFatal()) {
getStompTransport().onException(e);
}
}
@@ -219,6 +251,7 @@ public class ProtocolConverter {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
+ headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
if (command != null) {
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
@@ -235,6 +268,11 @@ public class ProtocolConverter {
checkConnected();
Map<String, String> headers = command.getHeaders();
+ String destination = headers.get(Stomp.Headers.Send.DESTINATION);
+ if (destination == null) {
+ throw new ProtocolException("SEND received without a Destination specified!");
+ }
+
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
headers.remove("transaction");
@@ -255,24 +293,64 @@ public class ProtocolConverter {
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
+ }
+
+ protected void onStompNack(StompFrame command) throws ProtocolException {
+ checkConnected();
+
+ if (this.version.equals(Stomp.V1_1)) {
+ throw new ProtocolException("NACK received but connection is in v1.0 mode.");
+ }
+
+ Map<String, String> headers = command.getHeaders();
+
+ String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+ if (subscriptionId == null) {
+ throw new ProtocolException("NACK received without a subscription id for acknowledge!");
+ }
+
+ String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ if (messageId == null) {
+ throw new ProtocolException("NACK received without a message-id to acknowledge!");
+ }
+
+ TransactionId activemqTx = null;
+ String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+ if (stompTx != null) {
+ activemqTx = transactions.get(stompTx);
+ if (activemqTx == null) {
+ throw new ProtocolException("Invalid transaction id: " + stompTx);
+ }
+ }
+
+ if (subscriptionId != null) {
+ StompSubscription sub = this.subscriptions.get(subscriptionId);
+ if (sub != null) {
+ MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
+ if (ack != null) {
+ sendToActiveMQ(ack, createResponseHandler(command));
+ } else {
+ throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
+ }
+ }
+ }
}
protected void onStompAck(StompFrame command) throws ProtocolException {
checkConnected();
- // TODO: acking with just a message id is very bogus
- // since the same message id could have been sent to 2 different
- // subscriptions
- // on the same stomp connection. For example, when 2 subs are created on
- // the same topic.
-
Map<String, String> headers = command.getHeaders();
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null) {
throw new ProtocolException("ACK received without a message-id to acknowledge!");
}
+ String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+ if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+ throw new ProtocolException("ACK received without a subscription id for acknowledge!");
+ }
+
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
@@ -283,21 +361,37 @@ public class ProtocolConverter {
}
boolean acked = false;
- for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
- StompSubscription sub = iter.next();
- MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
- if (ack != null) {
- ack.setTransactionId(activemqTx);
- sendToActiveMQ(ack, createResponseHandler(command));
- acked = true;
- break;
+
+ if (subscriptionId != null) {
+
+ StompSubscription sub = this.subscriptions.get(subscriptionId);
+ if (sub != null) {
+ MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
+ if (ack != null) {
+ sendToActiveMQ(ack, createResponseHandler(command));
+ acked = true;
+ }
+ }
+
+ } else {
+
+ // TODO: acking with just a message id is very bogus since the same message id
+ // could have been sent to 2 different subscriptions on the same Stomp connection.
+ // For example, when 2 subs are created on the same topic.
+
+ for (StompSubscription sub : subscriptionsByConsumerId.values()) {
+ MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
+ if (ack != null) {
+ sendToActiveMQ(ack, createResponseHandler(command));
+ acked = true;
+ break;
+ }
}
}
if (!acked) {
throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
}
-
}
protected void onStompBegin(StompFrame command) throws ProtocolException {
@@ -324,7 +418,6 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.BEGIN);
sendToActiveMQ(tx, createResponseHandler(command));
-
}
protected void onStompCommit(StompFrame command) throws ProtocolException {
@@ -342,8 +435,7 @@ public class ProtocolConverter {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
- for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
- StompSubscription sub = iter.next();
+ for (StompSubscription sub : subscriptionsByConsumerId.values()) {
sub.onStompCommit(activemqTx);
}
@@ -353,7 +445,6 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command));
-
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -369,8 +460,7 @@ public class ProtocolConverter {
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
- for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
- StompSubscription sub = iter.next();
+ for (StompSubscription sub : subscriptionsByConsumerId.values()) {
try {
sub.onStompAbort(activemqTx);
} catch (Exception e) {
@@ -384,7 +474,6 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.ROLLBACK);
sendToActiveMQ(tx, createResponseHandler(command));
-
}
protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@@ -395,6 +484,10 @@ public class ProtocolConverter {
String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
+ if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+ throw new ProtocolException("SUBSCRIBE received without a subscription id!");
+ }
+
ActiveMQDestination actualDest = translator.convertDestination(this, destination);
if (actualDest == null) {
@@ -406,6 +499,16 @@ public class ProtocolConverter {
consumerInfo.setPrefetchSize(1000);
consumerInfo.setDispatchAsync(true);
+ String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
+ if (browser != null && browser.equals(Stomp.TRUE)) {
+
+ if (!this.version.equals(Stomp.V1_1)) {
+ throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
+ }
+
+ consumerInfo.setBrowser(true);
+ }
+
String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
consumerInfo.setSelector(selector);
@@ -413,7 +516,12 @@ public class ProtocolConverter {
consumerInfo.setDestination(translator.convertDestination(this, destination));
- StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ StompSubscription stompSubscription;
+ if (!consumerInfo.isBrowser()) {
+ stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ } else {
+ stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ }
stompSubscription.setDestination(actualDest);
String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
@@ -426,8 +534,11 @@ public class ProtocolConverter {
}
subscriptionsByConsumerId.put(id, stompSubscription);
+ // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
+ if (subscriptionId != null) {
+ subscriptions.put(subscriptionId, stompSubscription);
+ }
sendToActiveMQ(consumerInfo, createResponseHandler(command));
-
}
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@@ -441,6 +552,9 @@ public class ProtocolConverter {
}
String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
+ if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+ throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
+ }
if (subscriptionId == null && destination == null) {
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
@@ -457,18 +571,26 @@ public class ProtocolConverter {
return;
}
- // TODO: Unsubscribing using a destination is a bit wierd if multiple
- // subscriptions
- // are created with the same destination. Perhaps this should be
- // removed.
- //
- for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
- StompSubscription sub = iter.next();
- if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
+ if (subscriptionId != null) {
+
+ StompSubscription sub = this.subscriptions.remove(subscriptionId);
+ if (sub != null) {
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
- iter.remove();
return;
}
+
+ } else {
+
+ // Unsubscribing using a destination is a bit weird if multiple subscriptions
+ // are created with the same destination.
+ for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+ StompSubscription sub = iter.next();
+ if (destination != null && destination.equals(sub.getDestination())) {
+ sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+ iter.remove();
+ return;
+ }
+ }
}
throw new ProtocolException("No subscription matched.");
@@ -488,10 +610,28 @@ public class ProtocolConverter {
String login = headers.get(Stomp.Headers.Connect.LOGIN);
String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
+ String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
+ String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
+ if (accepts == null) {
+ accepts = Stomp.DEFAULT_VERSION;
+ }
+ if (heartBeat == null) {
+ heartBeat = Stomp.DEFAULT_HEART_BEAT;
+ }
- IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
+ HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
+ acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
+ if (acceptsVersions.isEmpty()) {
+ throw new ProtocolException("Invlid Protocol version, supported versions are: " +
+ Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
+ } else {
+ this.version = Collections.max(acceptsVersions);
+ }
+ configureInactivityMonitor(heartBeat);
+
+ IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
connectionInfo.setConnectionId(connectionId);
if (clientId != null) {
connectionInfo.setClientId(clientId);
@@ -544,10 +684,22 @@ public class ProtocolConverter {
responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
}
+ responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
+ responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
+ String.format("%d,%d", hbWriteInterval, hbReadInterval));
+ responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
+
StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.CONNECTED);
sc.setHeaders(responseHeaders);
sendToStomp(sc);
+
+ if (version.equals(Stomp.V1_1)) {
+ StompWireFormat format = stompTransport.getWireFormat();
+ if (format != null) {
+ format.setEncodingEnabled(true);
+ }
+ }
}
});
@@ -576,7 +728,6 @@ public class ProtocolConverter {
*/
public void onActiveMQCommand(Command command) throws IOException, JMSException {
if (command.isResponse()) {
-
Response response = (Response)command;
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if (rh != null) {
@@ -589,12 +740,13 @@ public class ProtocolConverter {
}
}
} else if (command.isMessageDispatch()) {
-
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
sub.onMessageDispatch(md);
}
+ } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
+ stompTransport.sendToStomp(ping);
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the connection?
Throwable exception = ((ConnectionError)command).getException();
@@ -643,4 +795,49 @@ public class ProtocolConverter {
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
+
+ protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
+
+ String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
+
+ if (keepAliveOpts == null || keepAliveOpts.length != 2) {
+ throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
+ } else {
+
+ try {
+ hbReadInterval = Long.parseLong(keepAliveOpts[0]);
+ hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
+ } catch(NumberFormatException e) {
+ throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
+ }
+
+ if (hbReadInterval > 0) {
+ hbReadInterval = Math.max(DEFAULT_INBOUND_HEARTBEAT, hbReadInterval);
+ hbReadInterval += Math.min(hbReadInterval, 5000);
+ }
+
+ if (hbWriteInterval > 0) {
+ hbWriteInterval = Math.max(DEFAULT_OUTBOUND_HEARTBEAT, hbWriteInterval);
+ }
+
+ try {
+
+ StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
+
+ monitor.setReadCheckTime(hbReadInterval);
+ monitor.setInitialDelayTime(DEFAULT_INITIAL_HEARTBEAT_DELAY);
+ monitor.setWriteCheckTime(hbWriteInterval);
+
+ monitor.startMonitoring();
+
+ } catch(Exception ex) {
+ hbReadInterval = 0;
+ hbWriteInterval = 0;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
+ }
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java Fri Aug 12 20:29:29 2011
@@ -20,7 +20,29 @@ public interface Stomp {
String NULL = "\u0000";
String NEWLINE = "\n";
+ byte BREAK = '\n';
+ byte COLON = ':';
+ byte ESCAPE = '\\';
+ byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 };
+ byte[] COLON_ESCAPE_SEQ = { 92, 99 };
+ byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 };
+
+ String COMMA = ",";
+ String V1_0 = "1.0";
+ String V1_1 = "1.1";
+ String DEFAULT_HEART_BEAT = "0,0";
+ String DEFAULT_VERSION = "1.0";
+ String EMPTY = "";
+
+ String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"};
+
+ String TEXT_PLAIN = "text/plain";
+ String TRUE = "true";
+ String FALSE = "false";
+ String END = "end";
+
public static interface Commands {
+ String STOMP = "STOMP";
String CONNECT = "CONNECT";
String SEND = "SEND";
String DISCONNECT = "DISCONNECT";
@@ -34,6 +56,8 @@ public interface Stomp {
String COMMIT = "COMMIT";
String ABORT = "ABORT";
String ACK = "ACK";
+ String NACK = "NACK";
+ String KEEPALIVE = "KEEPALIVE";
}
public interface Responses {
@@ -48,8 +72,10 @@ public interface Stomp {
String RECEIPT_REQUESTED = "receipt";
String TRANSACTION = "transaction";
String CONTENT_LENGTH = "content-length";
+ String CONTENT_TYPE = "content-type";
String TRANSFORMATION = "transformation";
String TRANSFORMATION_ERROR = "transformation-error";
+
/**
* This header is used to instruct ActiveMQ to construct the message
* based with a specific type.
@@ -81,6 +107,7 @@ public interface Stomp {
String TIMESTAMP = "timestamp";
String TYPE = "type";
String SUBSCRIPTION = "subscription";
+ String BROWSER = "browser";
String USERID = "JMSXUserID";
String ORIGINAL_DESTINATION = "original-destination";
}
@@ -90,6 +117,7 @@ public interface Stomp {
String ACK_MODE = "ack";
String ID = "id";
String SELECTOR = "selector";
+ String BROWSER = "browser";
public interface AckModeValues {
String AUTO = "auto";
@@ -108,6 +136,9 @@ public interface Stomp {
String PASSCODE = "passcode";
String CLIENT_ID = "client-id";
String REQUEST_ID = "request-id";
+ String ACCEPT_VERSION = "accept-version";
+ String HOST = "host";
+ String HEART_BEAT = "heart-beat";
}
public interface Error {
@@ -117,30 +148,34 @@ public interface Stomp {
public interface Connected {
String SESSION = "session";
String RESPONSE_ID = "response-id";
+ String SERVER = "server";
+ String VERSION = "version";
+ String HEART_BEAT = "heart-beat";
}
public interface Ack {
String MESSAGE_ID = "message-id";
+ String SUBSCRIPTION = "subscription";
}
}
- public enum Transformations {
- JMS_BYTE,
- JMS_XML,
- JMS_JSON,
- JMS_OBJECT_XML,
- JMS_OBJECT_JSON,
- JMS_MAP_XML,
- JMS_MAP_JSON,
- JMS_ADVISORY_XML,
- JMS_ADVISORY_JSON;
-
- public String toString() {
- return name().replaceAll("_", "-").toLowerCase();
- }
-
- public static Transformations getValue(String value) {
- return valueOf(value.replaceAll("-", "_").toUpperCase());
- }
- }
+ public enum Transformations {
+ JMS_BYTE,
+ JMS_XML,
+ JMS_JSON,
+ JMS_OBJECT_XML,
+ JMS_OBJECT_JSON,
+ JMS_MAP_XML,
+ JMS_MAP_JSON,
+ JMS_ADVISORY_XML,
+ JMS_ADVISORY_JSON;
+
+ public String toString() {
+ return name().replaceAll("_", "-").toLowerCase();
+ }
+
+ public static Transformations getValue(String value) {
+ return valueOf(value.replaceAll("-", "_").toUpperCase());
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Fri Aug 12 20:29:29 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.st
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.wireformat.WireFormat;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
@@ -41,55 +40,55 @@ public class StompCodec {
}
public void parse(ByteArrayInputStream input, int readSize) throws Exception {
- int i = 0;
- int b;
- while(i++ < readSize) {
- b = input.read();
- // skip repeating nulls
- if (!processedHeaders && previousByte == 0 && b == 0) {
- continue;
+ int i = 0;
+ int b;
+ while(i++ < readSize) {
+ b = input.read();
+ // skip repeating nulls
+ if (!processedHeaders && previousByte == 0 && b == 0) {
+ continue;
+ }
+
+ if (!processedHeaders) {
+ currentCommand.write(b);
+ // end of headers section, parse action and header
+ if (previousByte == '\n' && b == '\n') {
+ if (transport.getWireFormat() instanceof StompWireFormat) {
+ DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+ action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
+ headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
+ String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLengthHeader != null) {
+ contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
+ } else {
+ contentLength = -1;
+ }
}
+ processedHeaders = true;
+ currentCommand.reset();
+ }
+ } else {
- if (!processedHeaders) {
+ if (contentLength == -1) {
+ // end of command reached, unmarshal
+ if (b == 0) {
+ processCommand();
+ } else {
currentCommand.write(b);
- // end of headers section, parse action and header
- if (previousByte == '\n' && b == '\n') {
- if (transport.getWireFormat() instanceof StompWireFormat) {
- DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
- action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
- headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
- String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLengthHeader != null) {
- contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
- } else {
- contentLength = -1;
- }
- }
- processedHeaders = true;
- currentCommand.reset();
- }
+ }
+ } else {
+ // read desired content length
+ if (readLength++ == contentLength) {
+ processCommand();
+ readLength = 0;
} else {
-
- if (contentLength == -1) {
- // end of command reached, unmarshal
- if (b == 0) {
- processCommand();
- } else {
- currentCommand.write(b);
- }
- } else {
- // read desired content length
- if (readLength++ == contentLength) {
- processCommand();
- readLength = 0;
- } else {
- currentCommand.write(b);
- }
- }
+ currentCommand.write(b);
}
-
- previousByte = b;
}
+ }
+
+ previousByte = b;
+ }
}
protected void processCommand() throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Fri Aug 12 20:29:29 2011
@@ -38,7 +38,7 @@ public class StompConnection {
}
public void open(Socket socket) {
- stompSocket = socket;
+ stompSocket = socket;
}
public void close() throws IOException {
@@ -70,8 +70,8 @@ public class StompConnection {
}
public StompFrame receive(long timeOut) throws Exception {
- stompSocket.setSoTimeout((int)timeOut);
- InputStream is = stompSocket.getInputStream();
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
StompWireFormat wf = new StompWireFormat();
DataInputStream dis = new DataInputStream(is);
return (StompFrame)wf.unmarshal(dis);
@@ -104,143 +104,143 @@ public class StompConnection {
}
}
- private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
- byte[] ba = inputBuffer.toByteArray();
+ private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
+ byte[] ba = inputBuffer.toByteArray();
inputBuffer.reset();
return new String(ba, "UTF-8");
}
public Socket getStompSocket() {
- return stompSocket;
- }
+ return stompSocket;
+ }
- public void setStompSocket(Socket stompSocket) {
- this.stompSocket = stompSocket;
- }
+ public void setStompSocket(Socket stompSocket) {
+ this.stompSocket = stompSocket;
+ }
public void connect(String username, String password) throws Exception {
connect(username, password, null);
}
public void connect(String username, String password, String client) throws Exception {
- HashMap<String, String> headers = new HashMap();
- headers.put("login", username);
- headers.put("passcode", password);
- if (client != null) {
- headers.put("client-id", client);
- }
- StompFrame frame = new StompFrame("CONNECT", headers);
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("login", username);
+ headers.put("passcode", password);
+ if (client != null) {
+ headers.put("client-id", client);
+ }
+ StompFrame frame = new StompFrame("CONNECT", headers);
sendFrame(frame.format());
StompFrame connect = receive();
if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
- throw new Exception ("Not connected: " + connect.getBody());
+ throw new Exception ("Not connected: " + connect.getBody());
}
}
public void disconnect() throws Exception {
- StompFrame frame = new StompFrame("DISCONNECT");
+ StompFrame frame = new StompFrame("DISCONNECT");
sendFrame(frame.format());
}
public void send(String destination, String message) throws Exception {
- send(destination, message, null, null);
+ send(destination, message, null, null);
}
public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
- if (headers == null) {
- headers = new HashMap<String, String>();
- }
- headers.put("destination", destination);
- if (transaction != null) {
- headers.put("transaction", transaction);
- }
- StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
+ if (headers == null) {
+ headers = new HashMap<String, String>();
+ }
+ headers.put("destination", destination);
+ if (transaction != null) {
+ headers.put("transaction", transaction);
+ }
+ StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
sendFrame(frame.format());
}
public void subscribe(String destination) throws Exception {
- subscribe(destination, null, null);
+ subscribe(destination, null, null);
}
public void subscribe(String destination, String ack) throws Exception {
- subscribe(destination, ack, new HashMap<String, String>());
+ subscribe(destination, ack, new HashMap<String, String>());
}
public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
- if (headers == null) {
- headers = new HashMap<String, String>();
- }
- headers.put("destination", destination);
- if (ack != null) {
- headers.put("ack", ack);
- }
- StompFrame frame = new StompFrame("SUBSCRIBE", headers);
+ if (headers == null) {
+ headers = new HashMap<String, String>();
+ }
+ headers.put("destination", destination);
+ if (ack != null) {
+ headers.put("ack", ack);
+ }
+ StompFrame frame = new StompFrame("SUBSCRIBE", headers);
sendFrame(frame.format());
}
public void unsubscribe(String destination) throws Exception {
- unsubscribe(destination, null);
+ unsubscribe(destination, null);
}
public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
- if (headers == null) {
- headers = new HashMap<String, String>();
- }
- headers.put("destination", destination);
- StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
+ if (headers == null) {
+ headers = new HashMap<String, String>();
+ }
+ headers.put("destination", destination);
+ StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
sendFrame(frame.format());
- }
-
+ }
+
public void begin(String transaction) throws Exception {
- HashMap<String, String> headers = new HashMap<String, String>();
- headers.put("transaction", transaction);
- StompFrame frame = new StompFrame("BEGIN", headers);
- sendFrame(frame.format());
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("BEGIN", headers);
+ sendFrame(frame.format());
}
public void abort(String transaction) throws Exception {
- HashMap<String, String> headers = new HashMap<String, String>();
- headers.put("transaction", transaction);
- StompFrame frame = new StompFrame("ABORT", headers);
- sendFrame(frame.format());
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("ABORT", headers);
+ sendFrame(frame.format());
}
public void commit(String transaction) throws Exception {
- HashMap<String, String> headers = new HashMap<String, String>();
- headers.put("transaction", transaction);
- StompFrame frame = new StompFrame("COMMIT", headers);
- sendFrame(frame.format());
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("COMMIT", headers);
+ sendFrame(frame.format());
}
public void ack(StompFrame frame) throws Exception {
- ack(frame.getHeaders().get("message-id"), null);
+ ack(frame.getHeaders().get("message-id"), null);
}
public void ack(StompFrame frame, String transaction) throws Exception {
- ack(frame.getHeaders().get("message-id"), transaction);
+ ack(frame.getHeaders().get("message-id"), transaction);
}
public void ack(String messageId) throws Exception {
- ack(messageId, null);
+ ack(messageId, null);
}
public void ack(String messageId, String transaction) throws Exception {
- HashMap<String, String> headers = new HashMap<String, String>();
- headers.put("message-id", messageId);
- if (transaction != null)
- headers.put("transaction", transaction);
- StompFrame frame = new StompFrame("ACK", headers);
- sendFrame(frame.format());
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put("message-id", messageId);
+ if (transaction != null)
+ headers.put("transaction", transaction);
+ StompFrame frame = new StompFrame("ACK", headers);
+ sendFrame(frame.format());
}
protected String appendHeaders(HashMap<String, Object> headers) {
- StringBuffer result = new StringBuffer();
- for (String key : headers.keySet()) {
- result.append(key + ":" + headers.get(key) + "\n");
- }
- result.append("\n");
- return result.toString();
+ StringBuilder result = new StringBuilder();
+ for (String key : headers.keySet()) {
+ result.append(key + ":" + headers.get(key) + "\n");
+ }
+ result.append("\n");
+ return result.toString();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Fri Aug 12 20:29:29 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.st
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.command.Command;
@@ -180,12 +179,11 @@ public class StompFrame implements Comma
}
public String format(boolean forLogging) {
- StringBuffer buffer = new StringBuffer();
+ StringBuilder buffer = new StringBuilder();
buffer.append(getAction());
buffer.append("\n");
- Map headers = getHeaders();
- for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry)iter.next();
+ Map<String, String> headers = getHeaders();
+ for (Map.Entry<String, String> entry : headers.entrySet()) {
buffer.append(entry.getKey());
buffer.append(":");
if (forLogging && entry.getKey().toString().toLowerCase().contains(Stomp.Headers.Connect.PASSCODE)) {
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java Fri Aug 12 20:29:29 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.transport.AbstractInactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to make sure that commands are arriving periodically from the peer of
+ * the transport.
+ */
+public class StompInactivityMonitor extends AbstractInactivityMonitor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StompInactivityMonitor.class);
+
+ private boolean isConfigured = false;
+
+ public StompInactivityMonitor(Transport next, WireFormat wireFormat) {
+ super(next, wireFormat);
+ }
+
+ public void startMonitoring() throws IOException {
+ this.isConfigured = true;
+ this.startMonitorThreads();
+ }
+
+ @Override
+ protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
+ }
+
+ @Override
+ protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
+ }
+
+ @Override
+ protected boolean configuredOk() throws IOException {
+
+ if (!isConfigured) {
+ return false;
+ }
+
+ LOG.debug("Stomp Inactivity Monitor read check: " + getReadCheckTime() +
+ ", write check: " + getWriteCheckTime());
+
+ if (this.getReadCheckTime() >= 0 && this.getWriteCheckTime() >= 0) {
+ return true;
+ }
+
+ return false;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java Fri Aug 12 20:29:29 2011
@@ -17,12 +17,10 @@
package org.apache.activemq.transport.stomp;
import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.SocketFactory;
import java.io.ByteArrayInputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -54,15 +54,15 @@ public class StompNIOSSLTransportFactory
return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
}
- @Override
+ @Override
public TransportServer doBind(URI location) throws IOException {
- if (SslContext.getCurrentSslContext() != null) {
- try {
- context = SslContext.getCurrentSslContext().getSSLContext();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
+ if (SslContext.getCurrentSslContext() != null) {
+ try {
+ context = SslContext.getCurrentSslContext().getSSLContext();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
return super.doBind(location);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Fri Aug 12 20:29:29 2011
@@ -26,19 +26,14 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.HashMap;
import javax.net.SocketFactory;
-import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@@ -46,7 +41,7 @@ import org.apache.activemq.wireformat.Wi
/**
* An implementation of the {@link Transport} interface for using Stomp over NIO
*
- *
+ *
*/
public class StompNIOTransport extends TcpTransport {
@@ -133,7 +128,7 @@ public class StompNIOTransport extends T
try {
selection.close();
} catch (Exception e) {
- e.printStackTrace();
+ e.printStackTrace();
}
super.doStop(stopper);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -35,12 +35,11 @@ import org.apache.activemq.transport.tcp
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.xbean.XBeanBrokerService;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
- *
- *
+ *
+ *
*/
public class StompNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
@@ -60,20 +59,15 @@ public class StompNIOTransportFactory ex
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new StompNIOTransport(wf, socketFactory, location, localLocation);
- }
+ }
+ @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
+ transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
- protected boolean isUseInactivityMonitor(Transport transport) {
- // lets disable the inactivity monitor as stomp does not use keep alive
- // packets
- return false;
- }
-
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java Fri Aug 12 20:29:29 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.TransactionId;
+
+public class StompQueueBrowserSubscription extends StompSubscription {
+
+ public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+ super(stompTransport, subscriptionId, consumerInfo, transformation);
+ }
+
+ @Override
+ void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+
+ if (md.getMessage() != null) {
+ super.onMessageDispatch(md);
+ } else {
+ StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
+ browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
+ browseDone.getHeaders().put(Stomp.Headers.Message.BROWSER, "end");
+ browseDone.getHeaders().put(Stomp.Headers.Message.DESTINATION,
+ protocolConverter.findTranslator(null).convertDestination(protocolConverter, this.destination));
+ browseDone.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, "0");
+
+ protocolConverter.sendToStomp(browseDone);
+ }
+ }
+
+ @Override
+ public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
+ throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -28,8 +28,8 @@ import org.apache.activemq.wireformat.Wi
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> over SSL transport factory
- *
- *
+ *
+ *
*/
public class StompSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
@@ -39,8 +39,9 @@ public class StompSslTransportFactory ex
return "stomp";
}
+ @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
+ transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Fri Aug 12 20:29:29 2011
@@ -45,17 +45,16 @@ public class StompSubscription {
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
- private final ProtocolConverter protocolConverter;
- private final String subscriptionId;
- private final ConsumerInfo consumerInfo;
-
- private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
- private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
-
- private String ackMode = AUTO_ACK;
- private ActiveMQDestination destination;
- private String transformation;
-
+ protected final ProtocolConverter protocolConverter;
+ protected final String subscriptionId;
+ protected final ConsumerInfo consumerInfo;
+
+ protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
+ protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
+
+ protected String ackMode = AUTO_ACK;
+ protected ActiveMQDestination destination;
+ protected String transformation;
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
this.protocolConverter = stompTransport;
@@ -82,12 +81,12 @@ public class StompSubscription {
boolean ignoreTransformation = false;
if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
- message.setReadOnlyProperties(false);
- message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
+ message.setReadOnlyProperties(false);
+ message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
} else {
- if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
- ignoreTransformation = true;
- }
+ if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
+ ignoreTransformation = true;
+ }
}
StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
@@ -101,24 +100,25 @@ public class StompSubscription {
}
synchronized void onStompAbort(TransactionId transactionId) {
- unconsumedMessage.clear();
+ unconsumedMessage.clear();
}
synchronized void onStompCommit(TransactionId transactionId) {
- for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+ @SuppressWarnings("rawtypes")
Map.Entry entry = (Entry)iter.next();
- MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
if (unconsumedMessage.contains(msg)) {
- iter.remove();
+ iter.remove();
}
- }
- unconsumedMessage.clear();
+ }
+
+ unconsumedMessage.clear();
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
- MessageId msgId = new MessageId(messageId);
+ MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
return null;
@@ -129,10 +129,11 @@ public class StompSubscription {
ack.setConsumerId(consumerInfo.getConsumerId());
if (ackMode == CLIENT_ACK) {
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
int count = 0;
- for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+ @SuppressWarnings("rawtypes")
Map.Entry entry = (Entry)iter.next();
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
@@ -142,39 +143,59 @@ public class StompSubscription {
}
if (transactionId != null) {
- if (!unconsumedMessage.contains(msg)) {
- unconsumedMessage.add(msg);
- }
+ if (!unconsumedMessage.contains(msg)) {
+ unconsumedMessage.add(msg);
+ }
} else {
- iter.remove();
+ iter.remove();
}
-
count++;
if (id.equals(msgId)) {
ack.setLastMessageId(id);
break;
}
-
}
ack.setMessageCount(count);
if (transactionId != null) {
- ack.setTransactionId(transactionId);
+ ack.setTransactionId(transactionId);
}
- }
- else if (ackMode == INDIVIDUAL_ACK) {
+
+ } else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
- unconsumedMessage.add(dispatchedMessage.get(msgId));
- ack.setTransactionId(transactionId);
+ unconsumedMessage.add(dispatchedMessage.get(msgId));
+ ack.setTransactionId(transactionId);
}
dispatchedMessage.remove(msgId);
}
return ack;
}
+ public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
+
+ MessageId msgId = new MessageId(messageId);
+
+ if (!dispatchedMessage.containsKey(msgId)) {
+ return null;
+ }
+
+ MessageAck ack = new MessageAck();
+ ack.setDestination(consumerInfo.getDestination());
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setAckType(MessageAck.POSION_ACK_TYPE);
+ ack.setMessageID(msgId);
+ if (transactionId != null) {
+ unconsumedMessage.add(dispatchedMessage.get(msgId));
+ ack.setTransactionId(transactionId);
+ }
+ dispatchedMessage.remove(msgId);
+
+ return null;
+ }
+
public String getAckMode() {
return ackMode;
}
@@ -198,5 +219,4 @@ public class StompSubscription {
public ConsumerInfo getConsumerInfo() {
return consumerInfo;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java Fri Aug 12 20:29:29 2011
@@ -28,11 +28,14 @@ import org.apache.activemq.command.Comma
public interface StompTransport {
public void sendToActiveMQ(Command command);
-
+
public void sendToStomp(StompFrame command) throws IOException;
-
+
public X509Certificate[] getPeerCertificates();
-
+
public void onException(IOException error);
-
+
+ public StompInactivityMonitor getInactivityMonitor();
+
+ public StompWireFormat getWireFormat();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -16,48 +16,47 @@
*/
package org.apache.activemq.transport.stomp;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Map;
-import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.xbean.XBeanBrokerService;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
- *
- *
+ *
+ *
*/
public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
- private BrokerContext brokerContext = null;
-
+ private BrokerContext brokerContext = null;
+
protected String getDefaultWireFormatType() {
return "stomp";
}
+ @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
+ transport = new StompTransportFilter(transport, format, brokerContext);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
- protected boolean isUseInactivityMonitor(Transport transport) {
- // lets disable the inactivity monitor as stomp does not use keep alive
- // packets
- return false;
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerContext = brokerService.getBrokerContext();
}
- public void setBrokerService(BrokerService brokerService) {
- this.brokerContext = brokerService.getBrokerContext();
- }
+ @Override
+ protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+ StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format);
+
+ StompTransportFilter filter = (StompTransportFilter) transport.narrow(StompTransportFilter.class);
+ filter.setInactivityMonitor(monitor);
+
+ return monitor;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java Fri Aug 12 20:29:29 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.transport.Tra
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,20 +37,24 @@ import org.slf4j.LoggerFactory;
* configured with the StompWireFormat and is used to convert STOMP commands to
* ActiveMQ commands. All of the conversion work is done by delegating to the
* ProtocolConverter.
- *
+ *
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompTransportFilter extends TransportFilter implements StompTransport {
private static final Logger LOG = LoggerFactory.getLogger(StompTransportFilter.class);
private final ProtocolConverter protocolConverter;
- private final FrameTranslator frameTranslator;
+ private StompInactivityMonitor monitor;
+ private StompWireFormat wireFormat;
private boolean trace;
- public StompTransportFilter(Transport next, FrameTranslator translator, BrokerContext brokerContext) {
+ public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
- this.frameTranslator = translator;
- this.protocolConverter = new ProtocolConverter(this, translator, brokerContext);
+ this.protocolConverter = new ProtocolConverter(this, brokerContext);
+
+ if (wireFormat instanceof StompWireFormat) {
+ this.wireFormat = (StompWireFormat) wireFormat;
+ }
}
public void oneway(Object o) throws IOException {
@@ -66,7 +71,7 @@ public class StompTransportFilter extend
if (trace) {
LOG.trace("Received: \n" + command);
}
-
+
protocolConverter.onStompCommand((StompFrame)command);
} catch (IOException e) {
onException(e);
@@ -92,21 +97,17 @@ public class StompTransportFilter extend
}
}
- public FrameTranslator getFrameTranslator() {
- return frameTranslator;
- }
-
public X509Certificate[] getPeerCertificates() {
- if(next instanceof SslTransport) {
- X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
- if (trace && peerCerts != null) {
+ if(next instanceof SslTransport) {
+ X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
+ if (trace && peerCerts != null) {
LOG.debug("Peer Identity has been verified\n");
}
- return peerCerts;
- }
- return null;
+ return peerCerts;
+ }
+ return null;
}
-
+
public boolean isTrace() {
return trace;
}
@@ -114,4 +115,18 @@ public class StompTransportFilter extend
public void setTrace(boolean trace) {
this.trace = trace;
}
+
+ @Override
+ public StompInactivityMonitor getInactivityMonitor() {
+ return monitor;
+ }
+
+ public void setInactivityMonitor(StompInactivityMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public StompWireFormat getWireFormat() {
+ return this.wireFormat;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Fri Aug 12 20:29:29 2011
@@ -21,8 +21,9 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.util.ByteArrayInputStream;
@@ -44,6 +45,7 @@ public class StompWireFormat implements
private static final int MAX_HEADERS = 1000;
private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private boolean encodingEnabled = false;
private int version = 1;
public ByteSequence marshal(Object command) throws IOException {
@@ -63,16 +65,20 @@ public class StompWireFormat implements
public void marshal(Object command, DataOutput os) throws IOException {
StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
- StringBuffer buffer = new StringBuffer();
+ if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) {
+ os.write(Stomp.BREAK);
+ return;
+ }
+
+ StringBuilder buffer = new StringBuilder();
buffer.append(stomp.getAction());
buffer.append(Stomp.NEWLINE);
// Output the headers.
- for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry)iter.next();
+ for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) {
buffer.append(entry.getKey());
buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(entry.getValue());
+ buffer.append(encodeHeader(entry.getValue()));
buffer.append(Stomp.NEWLINE);
}
@@ -87,7 +93,7 @@ public class StompWireFormat implements
public Object unmarshal(DataInput in) throws IOException {
try {
-
+
// parse action
String action = parseAction(in);
@@ -129,7 +135,6 @@ public class StompWireFormat implements
baos.close();
data = baos.toByteArray();
}
-
}
return new StompFrame(action, headers, data);
@@ -137,10 +142,14 @@ public class StompWireFormat implements
} catch (ProtocolException e) {
return new StompFrameError(e);
}
-
}
private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException {
+ ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage);
+ return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8").trim();
+ }
+
+ private ByteSequence readHeaderLine(DataInput in, int maxLength, String errorMessage) throws IOException {
byte b;
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
@@ -150,10 +159,9 @@ public class StompWireFormat implements
baos.write(b);
}
baos.close();
- ByteSequence sequence = baos.toByteSequence();
- return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
+ return baos.toByteSequence();
}
-
+
protected String parseAction(DataInput in) throws IOException {
String action = null;
@@ -171,21 +179,35 @@ public class StompWireFormat implements
}
return action;
}
-
+
protected HashMap<String, String> parseHeaders(DataInput in) throws IOException {
HashMap<String, String> headers = new HashMap<String, String>(25);
while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line != null && line.trim().length() > 0) {
+ ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.length > 0) {
if (headers.size() > MAX_HEADERS) {
throw new ProtocolException("The maximum number of headers was exceeded", true);
}
try {
- int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperatorIndex).trim();
- String value = line.substring(seperatorIndex + 1, line.length()).trim();
+
+ ByteArrayInputStream headerLine = new ByteArrayInputStream(line);
+ ByteArrayOutputStream stream = new ByteArrayOutputStream(line.length);
+
+ // First complete the name
+ int result = -1;
+ while ((result = headerLine.read()) != -1) {
+ if (result != ':') {
+ stream.write(result);
+ } else {
+ break;
+ }
+ }
+
+ ByteSequence nameSeq = stream.toByteSequence();
+ String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8").trim();
+ String value = decodeHeader(headerLine).trim();
headers.put(name, value);
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
@@ -193,10 +215,10 @@ public class StompWireFormat implements
} else {
break;
}
- }
+ }
return headers;
}
-
+
protected int parseContentLength(String contentLength) throws ProtocolException {
int length;
try {
@@ -208,10 +230,71 @@ public class StompWireFormat implements
if (length > MAX_DATA_LENGTH) {
throw new ProtocolException("The maximum data length was exceeded", true);
}
-
+
return length;
}
+ private String encodeHeader(String header) throws IOException {
+ String result = header;
+ if (this.encodingEnabled) {
+ byte[] utf8buf = header.getBytes("UTF-8");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length);
+ for(byte val : utf8buf) {
+ switch(val) {
+ case Stomp.ESCAPE:
+ stream.write(Stomp.ESCAPE_ESCAPE_SEQ);
+ break;
+ case Stomp.BREAK:
+ stream.write(Stomp.NEWLINE_ESCAPE_SEQ);
+ break;
+ case Stomp.COLON:
+ stream.write(Stomp.COLON_ESCAPE_SEQ);
+ break;
+ default:
+ stream.write(val);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private String decodeHeader(InputStream header) throws IOException {
+ ByteArrayOutputStream decoded = new ByteArrayOutputStream();
+ PushbackInputStream stream = new PushbackInputStream(header);
+
+ int value = -1;
+ while( (value = stream.read()) != -1) {
+ if (value == 92) {
+
+ int next = stream.read();
+ if (next != -1) {
+ switch(next) {
+ case 110:
+ decoded.write(Stomp.BREAK);
+ break;
+ case 99:
+ decoded.write(Stomp.COLON);
+ break;
+ case 92:
+ decoded.write(Stomp.ESCAPE);
+ break;
+ default:
+ stream.unread(next);
+ decoded.write(value);
+ }
+ } else {
+ decoded.write(value);
+ }
+
+ } else {
+ decoded.write(value);
+ }
+ }
+
+ return new String(decoded.toByteArray(), "UTF-8");
+ }
+
public int getVersion() {
return version;
}
@@ -220,4 +303,12 @@ public class StompWireFormat implements
this.version = version;
}
+ public boolean isEncodingEnabled() {
+ return this.encodingEnabled;
+ }
+
+ public void setEncodingEnabled(boolean value) {
+ this.encodingEnabled = value;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -20,9 +20,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
@@ -30,21 +27,13 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.KeyManager;
-import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@@ -57,15 +46,15 @@ import org.slf4j.LoggerFactory;
* contribution from this class is that it is aware of SslTransportServer and
* SslTransport classes. All Transports and TransportServers created from this
* factory will have their needClientAuth option set to false.
- *
+ *
* @author sepandm@gmail.com (Sepand)
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
- *
+ *
*/
public class SslTransportFactory extends TcpTransportFactory {
// The log this uses.,
private static final Logger LOG = LoggerFactory.getLogger(SslTransportFactory.class);
-
+
/**
* Overriding to use SslTransportServer and allow for proper reflection.
*/
@@ -91,6 +80,7 @@ public class SslTransportFactory extends
* Overriding to allow for proper configuration through reflection but delegate to get common
* configuration
*/
+ @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
@@ -120,14 +110,12 @@ public class SslTransportFactory extends
return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false);
}
-
-
/**
* Creates a new SSL ServerSocketFactory. The given factory will use
* user-provided key and trust managers (if the user provided them).
- *
+ *
* @return Newly created (Ssl)ServerSocketFactory.
- * @throws IOException
+ * @throws IOException
*/
protected ServerSocketFactory createServerSocketFactory() throws IOException {
if( SslContext.getCurrentSslContext()!=null ) {
@@ -145,12 +133,12 @@ public class SslTransportFactory extends
/**
* Creates a new SSL SocketFactory. The given factory will use user-provided
* key and trust managers (if the user provided them).
- *
+ *
* @return Newly created (Ssl)SocketFactory.
- * @throws IOException
+ * @throws IOException
*/
protected SocketFactory createSocketFactory() throws IOException {
-
+
if( SslContext.getCurrentSslContext()!=null ) {
SslContext ctx = SslContext.getCurrentSslContext();
try {
@@ -161,11 +149,10 @@ public class SslTransportFactory extends
} else {
return SSLSocketFactory.getDefault();
}
-
}
/**
- *
+ *
* @param km
* @param tm
* @param random