You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2014/10/31 13:49:48 UTC

git commit: [CXF-6075] NPE may occur at websocket destination under high load

Repository: cxf
Updated Branches:
  refs/heads/master fe9aaf634 -> 56c0db051


[CXF-6075] NPE may occur at websocket destination under high load


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/56c0db05
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/56c0db05
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/56c0db05

Branch: refs/heads/master
Commit: 56c0db05126292a61a782f05848321b9b8b8b80c
Parents: fe9aaf6
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Fri Oct 31 13:49:12 2014 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Fri Oct 31 13:49:38 2014 +0100

----------------------------------------------------------------------
 .../atmosphere/AtmosphereWebSocketHandler.java    | 16 ++++++++++++++--
 .../transport/websocket/jetty/JettyWebSocket.java | 16 ++++++++++++++--
 .../jetty9/Jetty9WebSocketDestination.java        | 18 +++++++++++++++++-
 3 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 776741e..38e6599 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -27,6 +27,7 @@ import java.security.Principal;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Locale;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -99,11 +100,11 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
     
     protected List<AtmosphereRequest> invokeService(final WebSocket webSocket,  final InputStream stream) {
         LOG.fine("invokeService(WebSocket, InputStream)");
-        // invoke the service directly as onMessage is synchronously blocked (in jetty)
+        // invoke the service asynchronously as onMessage is synchronously blocked (in jetty)
         // make sure the byte array passed to this method is immutable, as the websocket framework
         // may corrupt the byte array after this method is returned (i.e., before the data is returned in
         // the executor's thread.
-        destination.getExecutor().execute(new Runnable() {
+        executeServiceTask(new Runnable() {
             @Override
             public void run() {
                 HttpServletRequest request = null;
@@ -131,6 +132,17 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
         return null;
     }
 
+    private void executeServiceTask(Runnable r) {
+        try {
+            destination.getExecutor().execute(r);
+        } catch (RejectedExecutionException e) {
+            LOG.warning(
+                "Executor queue is full, run the service invocation task in caller thread." 
+                + "  Users can specify a larger executor queue to avoid this.");
+            r.run();
+        }
+    }
+    
     // may want to move this error reporting code to WebSocketServletHolder
     protected void reportErrorStatus(HttpServletResponse response, int status) {
         if (response != null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 5bab332..6ae3c9f 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -109,7 +110,7 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
         // make sure the byte array passed to this method is immutable, as the websocket framework
         // may corrupt the byte array after this method is returned (i.e., before the data is returned in
         // the executor's thread.
-        manager.getExecutor().execute(new Runnable() {
+        executeServiceTask(new Runnable() {
             @Override
             public void run() {
                 HttpServletRequest request = null;
@@ -133,7 +134,18 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
             }
         });
     }
-    
+
+    private void executeServiceTask(Runnable r) {
+        try {
+            manager.getExecutor().execute(r);
+        } catch (RejectedExecutionException e) {
+            LOG.warning(
+                "Executor queue is full, run the service invocation task in caller thread." 
+                + "  Users can specify a larger executor queue to avoid this.");
+            r.run();
+        }
+    }
+
     // may want to move this error reporting code to WebSocketServletHolder
     private void reportErrorStatus(HttpServletResponse response, int status) {
         if (response != null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
index ec2eeb2..32e681e 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
@@ -28,6 +28,8 @@ import java.util.Enumeration;
 import java.util.Locale;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Logger;
 
 import javax.servlet.DispatcherType;
 import javax.servlet.ServletConfig;
@@ -37,6 +39,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.classloader.ClassLoaderUtils;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
@@ -62,6 +65,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
  */
 public class Jetty9WebSocketDestination extends JettyHTTPDestination implements 
     WebSocketDestinationService {
+    private static final Logger LOG = LogUtils.getL7dLogger(Jetty9WebSocketDestination.class);
 
     //REVISIT make these keys configurable
     private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
@@ -130,7 +134,7 @@ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements
         // make sure the byte array passed to this method is immutable, as the websocket framework
         // may corrupt the byte array after this method is returned (i.e., before the data is returned in
         // the executor's thread.
-        executor.execute(new Runnable() {
+        executeServiceTask(new Runnable() {
             @Override
             public void run() {
                 HttpServletRequest request = null;
@@ -155,6 +159,18 @@ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements
 
         });
     }
+
+    private void executeServiceTask(Runnable r) {
+        try {
+            executor.execute(r);
+        } catch (RejectedExecutionException e) {
+            LOG.warning(
+                "Executor queue is full, run the service invocation task in caller thread." 
+                + "  Users can specify a larger executor queue to avoid this.");
+            r.run();
+        }
+    }
+
     private void reportErrorStatus(Session session, int i, HttpServletResponse resp) {
         try {
             resp.sendError(i);