You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/08/01 12:49:40 UTC
svn commit: r1367915 -
/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Author: dejanb
Date: Wed Aug 1 10:49:39 2012
New Revision: 1367915
URL: http://svn.apache.org/viewvc?rev=1367915&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3955 - web socket transport race condition
Modified:
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1367915&r1=1367914&r2=1367915&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java Wed Aug 1 10:49:39 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws
import java.io.IOException;
import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
@@ -30,14 +31,19 @@ import org.apache.activemq.util.ByteSequ
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.websocket.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements web socket and mediates between servlet and the broker
*/
class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport {
+ private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
+
Connection outbound;
ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
StompWireFormat wireFormat = new StompWireFormat();
+ private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
@Override
public void onOpen(Connection connection) {
@@ -50,6 +56,17 @@ class StompSocket extends TransportSuppo
@Override
public void onMessage(String data) {
+
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for StompSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+
try {
protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
} catch (Exception e) {
@@ -57,8 +74,13 @@ class StompSocket extends TransportSuppo
}
}
+ private boolean transportStartedAtLeastOnce() {
+ return socketTransportStarted.getCount() == 0;
+ }
+
@Override
protected void doStart() throws Exception {
+ socketTransportStarted.countDown();
}
@Override