You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/02/27 16:00:46 UTC

svn commit: r1662699 - in /tomcat/trunk/java/org/apache: coyote/http11/ coyote/http11/upgrade/ tomcat/util/net/

Author: markt
Date: Fri Feb 27 15:00:45 2015
New Revision: 1662699

URL: http://svn.apache.org/r1662699
Log:
Remove support for concurrent read/write.
This breaks blocking text messages

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java Fri Feb 27 15:00:45 2015
@@ -289,7 +289,7 @@ public abstract class AbstractHttp11Prot
                 return new UpgradeProcessorInternal(socket, leftoverInput,
                         (InternalHttpUpgradeHandler) httpUpgradeHandler);
             } else {
-                return new UpgradeProcessorExternal(socket, leftoverInput, httpUpgradeHandler);
+                return new UpgradeProcessorExternal(socket, leftoverInput);
             }
         }
     }

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java Fri Feb 27 15:00:45 2015
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
 
 import javax.servlet.ServletInputStream;
 import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpUpgradeHandler;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -39,8 +38,7 @@ public class UpgradeProcessorExternal ex
     private final UpgradeServletOutputStream upgradeServletOutputStream;
 
 
-    public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput,
-            HttpUpgradeHandler httpUpgradeHandler) {
+    public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput) {
         super(wrapper, leftOverInput);
         this.upgradeServletInputStream = new UpgradeServletInputStream(wrapper);
         this.upgradeServletOutputStream = new UpgradeServletOutputStream(wrapper);
@@ -51,10 +49,6 @@ public class UpgradeProcessorExternal ex
          */
         wrapper.setReadTimeout(INFINITE_TIMEOUT);
         wrapper.setWriteTimeout(INFINITE_TIMEOUT);
-
-        if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
-            wrapper.setInternalUpgrade(true);
-        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java Fri Feb 27 15:00:45 2015
@@ -41,7 +41,6 @@ public class UpgradeProcessorInternal ex
         wrapper.setWriteTimeout(INFINITE_TIMEOUT);
 
         internalHttpUpgradeHandler.setSocketWrapper(wrapper);
-        wrapper.setInternalUpgrade(true);
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Fri Feb 27 15:00:45 2015
@@ -2326,40 +2326,28 @@ public class AprEndpoint extends Abstrac
 
         @Override
         public void run() {
-            // Upgraded connections using an internal upgrade handler are
-            // allowed concurrent read/writes
-            if (socket.isInternalUpgrade() && SocketStatus.OPEN_WRITE == status) {
-                synchronized (socket.getWriteThreadLock()) {
-                    doRun();
+            synchronized (socket) {
+                // Process the request from this socket
+                if (socket.getSocket() == null || !connections.containsKey(socket)) {
+                    // Closed in another thread
+                    return;
                 }
-            } else {
-                synchronized (socket) {
-                    doRun();
+                SocketState state = handler.process(socket, status);
+                if (state == Handler.SocketState.CLOSED) {
+                    // Close socket and pool
+                    closeSocket(socket.getSocket().longValue());
+                    socket.reset(null, 1);
+                } else if (state == Handler.SocketState.LONG) {
+                    if (socket.isAsync()) {
+                        waitingRequests.add(socket);
+                    }
+                } else if (state == Handler.SocketState.ASYNC_END) {
+                    SocketProcessor proc = new SocketProcessor(socket,
+                            SocketStatus.OPEN_READ);
+                    getExecutor().execute(proc);
                 }
             }
         }
-
-        private void doRun() {
-            // Process the request from this socket
-            if (socket.getSocket() == null || !connections.containsKey(socket)) {
-                // Closed in another thread
-                return;
-            }
-            SocketState state = handler.process(socket, status);
-            if (state == Handler.SocketState.CLOSED) {
-                // Close socket and pool
-                closeSocket(socket.getSocket().longValue());
-                socket.reset(null, 1);
-            } else if (state == Handler.SocketState.LONG) {
-                if (socket.isAsync()) {
-                    waitingRequests.add(socket);
-                }
-            } else if (state == Handler.SocketState.ASYNC_END) {
-                SocketProcessor proc = new SocketProcessor(socket,
-                        SocketStatus.OPEN_READ);
-                getExecutor().execute(proc);
-            }
-        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Fri Feb 27 15:00:45 2015
@@ -1640,112 +1640,100 @@ public class Nio2Endpoint extends Abstra
 
         @Override
         public void run() {
-            // Upgraded connections using an internal upgrade handler are
-            // allowed concurrent read/writes
-            if (socket.isInternalUpgrade() && SocketStatus.OPEN_WRITE == status) {
-                synchronized (socket.getWriteThreadLock()) {
-                    doRun();
-                }
-            } else {
-                synchronized (socket) {
-                    doRun();
-                }
-            }
-        }
-
-        private void doRun() {
-            boolean launch = false;
-            try {
-                int handshake = -1;
-
+            synchronized (socket) {
+                boolean launch = false;
                 try {
-                    if (socket.getSocket() != null) {
-                        // For STOP there is no point trying to handshake as the
-                        // Poller has been stopped.
-                        if (socket.getSocket().isHandshakeComplete() ||
-                                status == SocketStatus.STOP) {
-                            handshake = 0;
-                        } else {
-                            handshake = socket.getSocket().handshake();
-                            // The handshake process reads/writes from/to the
-                            // socket. status may therefore be OPEN_WRITE once
-                            // the handshake completes. However, the handshake
-                            // happens when the socket is opened so the status
-                            // must always be OPEN_READ after it completes. It
-                            // is OK to always set this as it is only used if
-                            // the handshake completes.
-                            status = SocketStatus.OPEN_READ;
+                    int handshake = -1;
+
+                    try {
+                        if (socket.getSocket() != null) {
+                            // For STOP there is no point trying to handshake as the
+                            // Poller has been stopped.
+                            if (socket.getSocket().isHandshakeComplete() ||
+                                    status == SocketStatus.STOP) {
+                                handshake = 0;
+                            } else {
+                                handshake = socket.getSocket().handshake();
+                                // The handshake process reads/writes from/to the
+                                // socket. status may therefore be OPEN_WRITE once
+                                // the handshake completes. However, the handshake
+                                // happens when the socket is opened so the status
+                                // must always be OPEN_READ after it completes. It
+                                // is OK to always set this as it is only used if
+                                // the handshake completes.
+                                status = SocketStatus.OPEN_READ;
+                            }
+                        }
+                    } catch (IOException x) {
+                        handshake = -1;
+                        if (log.isDebugEnabled()) {
+                            log.debug(sm.getString("endpoint.err.handshake"), x);
                         }
                     }
-                } catch (IOException x) {
-                    handshake = -1;
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("endpoint.err.handshake"), x);
-                    }
-                }
-                if (handshake == 0) {
-                    SocketState state = SocketState.OPEN;
-                    // Process the request from this socket
-                    if (status == null) {
-                        state = handler.process(socket, SocketStatus.OPEN_READ);
-                    } else {
-                        state = handler.process(socket, status);
-                    }
-                    if (state == SocketState.CLOSED) {
-                        // Close socket and pool
+                    if (handshake == 0) {
+                        SocketState state = SocketState.OPEN;
+                        // Process the request from this socket
+                        if (status == null) {
+                            state = handler.process(socket, SocketStatus.OPEN_READ);
+                        } else {
+                            state = handler.process(socket, status);
+                        }
+                        if (state == SocketState.CLOSED) {
+                            // Close socket and pool
+                            closeSocket(socket);
+                            if (useCaches && running && !paused) {
+                                nioChannels.push(socket.getSocket());
+                                socketWrapperCache.push((Nio2SocketWrapper) socket);
+                            }
+                        } else if (state == SocketState.UPGRADING) {
+                            socket.setKeptAlive(true);
+                            launch = true;
+                        }
+                    } else if (handshake == -1 ) {
                         closeSocket(socket);
                         if (useCaches && running && !paused) {
                             nioChannels.push(socket.getSocket());
-                            socketWrapperCache.push((Nio2SocketWrapper) socket);
+                            socketWrapperCache.push(((Nio2SocketWrapper) socket));
                         }
-                    } else if (state == SocketState.UPGRADING) {
-                        socket.setKeptAlive(true);
-                        launch = true;
-                    }
-                } else if (handshake == -1 ) {
-                    closeSocket(socket);
-                    if (useCaches && running && !paused) {
-                        nioChannels.push(socket.getSocket());
-                        socketWrapperCache.push(((Nio2SocketWrapper) socket));
                     }
-                }
-            } catch (OutOfMemoryError oom) {
-                try {
-                    oomParachuteData = null;
-                    log.error("", oom);
-                    closeSocket(socket);
-                    releaseCaches();
-                } catch (Throwable oomt) {
+                } catch (OutOfMemoryError oom) {
                     try {
-                        System.err.println(oomParachuteMsg);
-                        oomt.printStackTrace();
-                    } catch (Throwable letsHopeWeDontGetHere){
-                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
+                        oomParachuteData = null;
+                        log.error("", oom);
+                        closeSocket(socket);
+                        releaseCaches();
+                    } catch (Throwable oomt) {
+                        try {
+                            System.err.println(oomParachuteMsg);
+                            oomt.printStackTrace();
+                        } catch (Throwable letsHopeWeDontGetHere){
+                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
+                        }
                     }
-                }
-            } catch (VirtualMachineError vme) {
-                ExceptionUtils.handleThrowable(vme);
-            } catch (Throwable t) {
-                log.error(sm.getString("endpoint.processing.fail"), t);
-                if (socket != null) {
-                    closeSocket(socket);
-                }
-            } finally {
-                if (launch) {
-                    try {
-                        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
-                    } catch (NullPointerException npe) {
-                        if (running) {
-                            log.error(sm.getString("endpoint.launch.fail"),
-                                    npe);
+                } catch (VirtualMachineError vme) {
+                    ExceptionUtils.handleThrowable(vme);
+                } catch (Throwable t) {
+                    log.error(sm.getString("endpoint.processing.fail"), t);
+                    if (socket != null) {
+                        closeSocket(socket);
+                    }
+                } finally {
+                    if (launch) {
+                        try {
+                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
+                        } catch (NullPointerException npe) {
+                            if (running) {
+                                log.error(sm.getString("endpoint.launch.fail"),
+                                        npe);
+                            }
                         }
                     }
-                }
-                socket = null;
-                status = null;
-                //return to cache
-                if (useCaches && running && !paused) {
-                    processorCache.push(this);
+                    socket = null;
+                    status = null;
+                    //return to cache
+                    if (useCaches && running && !paused) {
+                        processorCache.push(this);
+                    }
                 }
             }
         }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Feb 27 15:00:45 2015
@@ -1689,84 +1689,68 @@ public class NioEndpoint extends Abstrac
             SelectionKey key = socket.getIOChannel().keyFor(
                     socket.getPoller().getSelector());
 
-            // Upgraded connections using an internal upgrade handler are
-            // allowed concurrent read/writes
-            if (ka.isInternalUpgrade() && SocketStatus.OPEN_WRITE == status) {
-                synchronized (ka.getWriteThreadLock()) {
-                    doRun(key, ka);
-                }
-            } else {
-                synchronized (socket) {
-                    doRun(key, ka);
-                }
-            }
-        }
-
-        private void doRun(SelectionKey key, NioSocketWrapper ka) {
-            NioChannel socket = ka.getSocket();
-
-            try {
-                int handshake = -1;
-
+            synchronized (socket) {
                 try {
-                    if (key != null && socket != null) {
-                        // For STOP there is no point trying to handshake as the
-                        // Poller has been stopped.
-                        if (socket.isHandshakeComplete() ||
-                                status == SocketStatus.STOP) {
-                            handshake = 0;
+                    int handshake = -1;
+
+                    try {
+                        if (key != null) {
+                            // For STOP there is no point trying to handshake as the
+                            // Poller has been stopped.
+                            if (socket.isHandshakeComplete() ||
+                                    status == SocketStatus.STOP) {
+                                handshake = 0;
+                            } else {
+                                handshake = socket.handshake(
+                                        key.isReadable(), key.isWritable());
+                                // The handshake process reads/writes from/to the
+                                // socket. status may therefore be OPEN_WRITE once
+                                // the handshake completes. However, the handshake
+                                // happens when the socket is opened so the status
+                                // must always be OPEN_READ after it completes. It
+                                // is OK to always set this as it is only used if
+                                // the handshake completes.
+                                status = SocketStatus.OPEN_READ;
+                            }
+                        }
+                    } catch (IOException x) {
+                        handshake = -1;
+                        if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
+                    } catch (CancelledKeyException ckx) {
+                        handshake = -1;
+                    }
+                    if (handshake == 0) {
+                        SocketState state = SocketState.OPEN;
+                        // Process the request from this socket
+                        if (status == null) {
+                            state = handler.process(ka, SocketStatus.OPEN_READ);
                         } else {
-                            handshake = socket.handshake(
-                                    key.isReadable(), key.isWritable());
-                            // The handshake process reads/writes from/to the
-                            // socket. status may therefore be OPEN_WRITE once
-                            // the handshake completes. However, the handshake
-                            // happens when the socket is opened so the status
-                            // must always be OPEN_READ after it completes. It
-                            // is OK to always set this as it is only used if
-                            // the handshake completes.
-                            status = SocketStatus.OPEN_READ;
-                        }
-                    }
-                } catch (IOException x) {
-                    handshake = -1;
-                    if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
-                } catch (CancelledKeyException ckx) {
-                    handshake = -1;
-                }
-                if (handshake == 0) {
-                    SocketState state = SocketState.OPEN;
-                    // Process the request from this socket
-                    if (status == null) {
-                        state = handler.process(ka, SocketStatus.OPEN_READ);
-                    } else {
-                        state = handler.process(ka, status);
-                    }
-                    if (state == SocketState.CLOSED) {
-                        // Close socket and pool
-                        try {
-                            if (socket.getPoller().cancelledKey(key) != null) {
-                                // SocketWrapper (attachment) was removed from the
-                                // key - recycle both. This can only happen once
-                                // per attempted closure so it is used to determine
-                                // whether or not to return socket and ka to
-                                // their respective caches. We do NOT want to do
-                                // this more than once - see BZ 57340.
-                                if (running && !paused) {
-                                    nioChannels.push(socket);
-                                }
-                                socket = null;
-                                if (running && !paused) {
-                                    keyCache.push(ka);
+                            state = handler.process(ka, status);
+                        }
+                        if (state == SocketState.CLOSED) {
+                            // Close socket and pool
+                            try {
+                                if (socket.getPoller().cancelledKey(key) != null) {
+                                    // SocketWrapper (attachment) was removed from the
+                                    // key - recycle both. This can only happen once
+                                    // per attempted closure so it is used to determine
+                                    // whether or not to return socket and ka to
+                                    // their respective caches. We do NOT want to do
+                                    // this more than once - see BZ 57340.
+                                    if (running && !paused) {
+                                        nioChannels.push(socket);
+                                    }
+                                    socket = null;
+                                    if (running && !paused) {
+                                        keyCache.push(ka);
+                                    }
                                 }
+                                ka = null;
+                            } catch (Exception x) {
+                                log.error("",x);
                             }
-                            ka = null;
-                        } catch (Exception x) {
-                            log.error("",x);
                         }
-                    }
-                } else if (handshake == -1 ) {
-                    if (socket != null) {
+                    } else if (handshake == -1 ) {
                         if (key != null) {
                             socket.getPoller().cancelledKey(key);
                         }
@@ -1774,47 +1758,47 @@ public class NioEndpoint extends Abstrac
                             nioChannels.push(socket);
                         }
                         socket = null;
+                        if (running && !paused) {
+                            keyCache.push(ka);
+                        }
+                        ka = null;
+                    } else {
+                        ka.getPoller().add(socket,handshake);
                     }
-                    if (running && !paused) {
-                        keyCache.push(ka);
-                    }
-                    ka = null;
-                } else {
-                    ka.getPoller().add(socket,handshake);
-                }
-            } catch (CancelledKeyException cx) {
-                if (socket != null) {
-                    socket.getPoller().cancelledKey(key);
-                }
-            } catch (OutOfMemoryError oom) {
-                try {
-                    oomParachuteData = null;
-                    log.error("", oom);
+                } catch (CancelledKeyException cx) {
                     if (socket != null) {
                         socket.getPoller().cancelledKey(key);
                     }
-                    releaseCaches();
-                } catch (Throwable oomt) {
+                } catch (OutOfMemoryError oom) {
                     try {
-                        System.err.println(oomParachuteMsg);
-                        oomt.printStackTrace();
-                    } catch (Throwable letsHopeWeDontGetHere){
-                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
+                        oomParachuteData = null;
+                        log.error("", oom);
+                        if (socket != null) {
+                            socket.getPoller().cancelledKey(key);
+                        }
+                        releaseCaches();
+                    } catch (Throwable oomt) {
+                        try {
+                            System.err.println(oomParachuteMsg);
+                            oomt.printStackTrace();
+                        } catch (Throwable letsHopeWeDontGetHere){
+                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
+                        }
+                    }
+                } catch (VirtualMachineError vme) {
+                    ExceptionUtils.handleThrowable(vme);
+                } catch (Throwable t) {
+                    log.error("", t);
+                    if (socket != null) {
+                        socket.getPoller().cancelledKey(key);
+                    }
+                } finally {
+                    socket = null;
+                    status = null;
+                    //return to cache
+                    if (running && !paused) {
+                        processorCache.push(this);
                     }
-                }
-            } catch (VirtualMachineError vme) {
-                ExceptionUtils.handleThrowable(vme);
-            } catch (Throwable t) {
-                log.error("", t);
-                if (socket != null) {
-                    socket.getPoller().cancelledKey(key);
-                }
-            } finally {
-                socket = null;
-                status = null;
-                //return to cache
-                if (running && !paused) {
-                    processorCache.push(this);
                 }
             }
         }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1662699&r1=1662698&r2=1662699&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Fri Feb 27 15:00:45 2015
@@ -51,7 +51,6 @@ public abstract class SocketWrapperBase<
     private volatile boolean async = false;
     private boolean keptAlive = false;
     private volatile boolean upgraded = false;
-    private volatile boolean internalUpgrade = false;
     private boolean secure = false;
     /*
      * Following cached for speed / reduced GC
@@ -153,10 +152,6 @@ public abstract class SocketWrapperBase<
     }
     public boolean isUpgraded() { return upgraded; }
     public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; }
-    public boolean isInternalUpgrade() {return internalUpgrade; }
-    public void setInternalUpgrade(boolean internalUpgrade) {
-        this.internalUpgrade = internalUpgrade;
-    }
     public boolean isSecure() { return secure; }
     public void setSecure(boolean secure) { this.secure = secure; }
 
@@ -351,7 +346,6 @@ public abstract class SocketWrapperBase<
         this.readTimeout = soTimeout;
         this.writeTimeout = soTimeout;
         upgraded = false;
-        internalUpgrade = false;
         resetSocketBufferHandler(socket);
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org