You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/11/04 16:56:27 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6482

Repository: activemq
Updated Branches:
  refs/heads/master 7cf7fba7a -> 450cabe4e


https://issues.apache.org/jira/browse/AMQ-6482

Adding a timeout for websocket sends to prevent the transport thread
from getting stuck and blocking.  The default is 30 seconds.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/450cabe4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/450cabe4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/450cabe4

Branch: refs/heads/master
Commit: 450cabe4ead1fb78eec2e94013d2868a5bf864da
Parents: 946c945
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Nov 4 12:54:39 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Nov 4 12:56:20 2016 -0400

----------------------------------------------------------------------
 .../activemq/transport/ws/jetty9/MQTTSocket.java       | 12 +++++++++++-
 .../activemq/transport/ws/jetty9/StompSocket.java      | 13 +++++++++++--
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/450cabe4/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index d8c248d..2b4be15 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -46,7 +46,13 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
     @Override
     public void sendToMQTT(MQTTFrame command) throws IOException {
         ByteSequence bytes = wireFormat.marshal(command);
-        session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
+        try {
+            //timeout after a period of time so we don't wait forever and hold the protocol lock
+            session.getRemote().sendBytesByFuture(
+                    ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
     @Override
@@ -117,4 +123,8 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
     @Override
     public void onWebSocketText(String arg0) {
     }
+
+    private static int getDefaultSendTimeOut() {
+        return Integer.getInteger("org.apache.activemq.transport.ws.MQTTSocket.sendTimeout", 30);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/450cabe4/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
index 76ac560..72efef7 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 import org.apache.activemq.transport.ws.AbstractStompSocket;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketListener;
 import org.slf4j.Logger;
@@ -44,8 +45,12 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
 
     @Override
     public void sendToStomp(StompFrame command) throws IOException {
-        //Send async - do we need to wait for the future to complete?
-        session.getRemote().sendStringByFuture(command.format());
+        try {
+            //timeout after a period of time so we don't wait forever and hold the protocol lock
+            session.getRemote().sendStringByFuture(command.format()).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
     @Override
@@ -90,4 +95,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
     public void onWebSocketText(String data) {
         processStompFrame(data);
     }
+
+    private static int getDefaultSendTimeOut() {
+        return Integer.getInteger("org.apache.activemq.transport.ws.StompSocket.sendTimeout", 30);
+    }
 }


[2/2] activemq git commit: proposed websocket fix

Posted by cs...@apache.org.
proposed websocket fix


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/946c9454
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/946c9454
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/946c9454

Branch: refs/heads/master
Commit: 946c9454d512280cdae525ba25db370f2b61ff43
Parents: 7cf7fba
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Nov 3 08:41:37 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Nov 4 12:56:20 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/activemq/transport/ws/jetty9/StompSocket.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/946c9454/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
index ee012db..76ac560 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -44,7 +44,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
 
     @Override
     public void sendToStomp(StompFrame command) throws IOException {
-        session.getRemote().sendString(command.format());
+        //Send async - do we need to wait for the future to complete?
+        session.getRemote().sendStringByFuture(command.format());
     }
 
     @Override