You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2014/10/31 13:49:48 UTC
git commit: [CXF-6075] NPE may occur at websocket destination under
high load
Repository: cxf
Updated Branches:
refs/heads/master fe9aaf634 -> 56c0db051
[CXF-6075] NPE may occur at websocket destination under high load
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/56c0db05
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/56c0db05
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/56c0db05
Branch: refs/heads/master
Commit: 56c0db05126292a61a782f05848321b9b8b8b80c
Parents: fe9aaf6
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Fri Oct 31 13:49:12 2014 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Fri Oct 31 13:49:38 2014 +0100
----------------------------------------------------------------------
.../atmosphere/AtmosphereWebSocketHandler.java | 16 ++++++++++++++--
.../transport/websocket/jetty/JettyWebSocket.java | 16 ++++++++++++++--
.../jetty9/Jetty9WebSocketDestination.java | 18 +++++++++++++++++-
3 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 776741e..38e6599 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -27,6 +27,7 @@ import java.security.Principal;
import java.util.Enumeration;
import java.util.List;
import java.util.Locale;
+import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -99,11 +100,11 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
protected List<AtmosphereRequest> invokeService(final WebSocket webSocket, final InputStream stream) {
LOG.fine("invokeService(WebSocket, InputStream)");
- // invoke the service directly as onMessage is synchronously blocked (in jetty)
+ // invoke the service asynchronously as onMessage is synchronously blocked (in jetty)
// make sure the byte array passed to this method is immutable, as the websocket framework
// may corrupt the byte array after this method is returned (i.e., before the data is returned in
// the executor's thread.
- destination.getExecutor().execute(new Runnable() {
+ executeServiceTask(new Runnable() {
@Override
public void run() {
HttpServletRequest request = null;
@@ -131,6 +132,17 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol {
return null;
}
+ private void executeServiceTask(Runnable r) {
+ try {
+ destination.getExecutor().execute(r);
+ } catch (RejectedExecutionException e) {
+ LOG.warning(
+ "Executor queue is full, run the service invocation task in caller thread."
+ + " Users can specify a larger executor queue to avoid this.");
+ r.run();
+ }
+ }
+
// may want to move this error reporting code to WebSocketServletHolder
protected void reportErrorStatus(HttpServletResponse response, int status) {
if (response != null) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 5bab332..6ae3c9f 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -109,7 +110,7 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
// make sure the byte array passed to this method is immutable, as the websocket framework
// may corrupt the byte array after this method is returned (i.e., before the data is returned in
// the executor's thread.
- manager.getExecutor().execute(new Runnable() {
+ executeServiceTask(new Runnable() {
@Override
public void run() {
HttpServletRequest request = null;
@@ -133,7 +134,18 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa
}
});
}
-
+
+ private void executeServiceTask(Runnable r) {
+ try {
+ manager.getExecutor().execute(r);
+ } catch (RejectedExecutionException e) {
+ LOG.warning(
+ "Executor queue is full, run the service invocation task in caller thread."
+ + " Users can specify a larger executor queue to avoid this.");
+ r.run();
+ }
+ }
+
// may want to move this error reporting code to WebSocketServletHolder
private void reportErrorStatus(HttpServletResponse response, int status) {
if (response != null) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/56c0db05/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
index ec2eeb2..32e681e 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
@@ -28,6 +28,8 @@ import java.util.Enumeration;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Logger;
import javax.servlet.DispatcherType;
import javax.servlet.ServletConfig;
@@ -37,6 +39,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.cxf.Bus;
import org.apache.cxf.common.classloader.ClassLoaderUtils;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.DestinationRegistry;
import org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
@@ -62,6 +65,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
*/
public class Jetty9WebSocketDestination extends JettyHTTPDestination implements
WebSocketDestinationService {
+ private static final Logger LOG = LogUtils.getL7dLogger(Jetty9WebSocketDestination.class);
//REVISIT make these keys configurable
private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
@@ -130,7 +134,7 @@ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements
// make sure the byte array passed to this method is immutable, as the websocket framework
// may corrupt the byte array after this method is returned (i.e., before the data is returned in
// the executor's thread.
- executor.execute(new Runnable() {
+ executeServiceTask(new Runnable() {
@Override
public void run() {
HttpServletRequest request = null;
@@ -155,6 +159,18 @@ public class Jetty9WebSocketDestination extends JettyHTTPDestination implements
});
}
+
+ private void executeServiceTask(Runnable r) {
+ try {
+ executor.execute(r);
+ } catch (RejectedExecutionException e) {
+ LOG.warning(
+ "Executor queue is full, run the service invocation task in caller thread."
+ + " Users can specify a larger executor queue to avoid this.");
+ r.run();
+ }
+ }
+
private void reportErrorStatus(Session session, int i, HttpServletResponse resp) {
try {
resp.sendError(i);