You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/08/01 12:49:40 UTC

svn commit: r1367915 - /activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java

Author: dejanb
Date: Wed Aug  1 10:49:39 2012
New Revision: 1367915

URL: http://svn.apache.org/viewvc?rev=1367915&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3955 - web socket transport race condition

Modified:
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1367915&r1=1367914&r2=1367915&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java Wed Aug  1 10:49:39 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.TransportSupport;
@@ -30,14 +31,19 @@ import org.apache.activemq.util.ByteSequ
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.eclipse.jetty.websocket.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements web socket and mediates between servlet and the broker
  */
 class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport {
+    private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
+
     Connection outbound;
     ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
     StompWireFormat wireFormat = new StompWireFormat();
+    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
 
     @Override
     public void onOpen(Connection connection) {
@@ -50,6 +56,17 @@ class StompSocket extends TransportSuppo
 
     @Override
     public void onMessage(String data) {
+
+        if (!transportStartedAtLeastOnce()) {
+            LOG.debug("Waiting for StompSocket to be properly started...");
+            try {
+                socketTransportStarted.await();
+            } catch (InterruptedException e) {
+                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+            }
+        }
+
+
         try {
             protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
         } catch (Exception e) {
@@ -57,8 +74,13 @@ class StompSocket extends TransportSuppo
         }
     }
 
+    private boolean transportStartedAtLeastOnce() {
+        return socketTransportStarted.getCount() == 0;
+    }
+
     @Override
     protected void doStart() throws Exception {
+        socketTransportStarted.countDown();
     }
 
     @Override