You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2016/12/07 09:28:40 UTC

svn commit: r1773036 - in /tomcat/trunk: java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/ java/org/apache/tomcat/websocket/server/ webapps/docs/

Author: markt
Date: Wed Dec  7 09:28:40 2016
New Revision: 1773036

URL: http://svn.apache.org/viewvc?rev=1773036&view=rev
Log:
Refactor the per Endpoint Acceptors into a single Acceptor class.

Added:
    tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java   (with props)
Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Wed Dec  7 09:28:40 2016
@@ -46,7 +46,7 @@ public abstract class AbstractProcessor
     protected Adapter adapter;
     protected final AsyncStateMachine asyncStateMachine;
     private volatile long asyncTimeout = -1;
-    protected final AbstractEndpoint<?> endpoint;
+    protected final AbstractEndpoint<?,?> endpoint;
     protected final Request request;
     protected final Response response;
     protected volatile SocketWrapperBase<?> socketWrapper = null;
@@ -59,12 +59,12 @@ public abstract class AbstractProcessor
     private ErrorState errorState = ErrorState.NONE;
 
 
-    public AbstractProcessor(AbstractEndpoint<?> endpoint) {
+    public AbstractProcessor(AbstractEndpoint<?,?> endpoint) {
         this(endpoint, new Request(), new Response());
     }
 
 
-    protected AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest,
+    protected AbstractProcessor(AbstractEndpoint<?,?> endpoint, Request coyoteRequest,
             Response coyoteResponse) {
         this.endpoint = endpoint;
         asyncStateMachine = new AsyncStateMachine(this);

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Wed Dec  7 09:28:40 2016
@@ -87,7 +87,7 @@ public abstract class AbstractProtocol<S
      * ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
      * Endpoint etc.).
      */
-    private final AbstractEndpoint<S> endpoint;
+    private final AbstractEndpoint<S,?> endpoint;
 
 
     private Handler<S> handler;
@@ -103,7 +103,7 @@ public abstract class AbstractProtocol<S
     private AsyncTimeout asyncTimeout = null;
 
 
-    public AbstractProtocol(AbstractEndpoint<S> endpoint) {
+    public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
         this.endpoint = endpoint;
         setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
         setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
@@ -364,7 +364,7 @@ public abstract class AbstractProtocol<S
 
     // ----------------------------------------------- Accessors for sub-classes
 
-    protected AbstractEndpoint<S> getEndpoint() {
+    protected AbstractEndpoint<S,?> getEndpoint() {
         return endpoint;
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java Wed Dec  7 09:28:40 2016
@@ -41,7 +41,7 @@ public abstract class AbstractAjpProtoco
     protected static final StringManager sm = StringManager.getManager(AbstractAjpProtocol.class);
 
 
-    public AbstractAjpProtocol(AbstractEndpoint<S> endpoint) {
+    public AbstractAjpProtocol(AbstractEndpoint<S,?> endpoint) {
         super(endpoint);
         setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
         // AJP does not use Send File
@@ -64,7 +64,7 @@ public abstract class AbstractAjpProtoco
      * Overridden to make getter accessible to other classes in this package.
      */
     @Override
-    protected AbstractEndpoint<S> getEndpoint() {
+    protected AbstractEndpoint<S,?> getEndpoint() {
         return super.getEndpoint();
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Wed Dec  7 09:28:40 2016
@@ -243,7 +243,7 @@ public class AjpProcessor extends Abstra
 
     // ------------------------------------------------------------ Constructor
 
-    public AjpProcessor(int packetSize, AbstractEndpoint<?> endpoint) {
+    public AjpProcessor(int packetSize, AbstractEndpoint<?,?> endpoint) {
 
         super(endpoint);
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java Wed Dec  7 09:28:40 2016
@@ -22,15 +22,15 @@ import org.apache.tomcat.util.net.openss
 public abstract class AbstractHttp11JsseProtocol<S>
         extends AbstractHttp11Protocol<S> {
 
-    public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S> endpoint) {
+    public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) {
         super(endpoint);
     }
 
 
     @Override
-    protected AbstractJsseEndpoint<S> getEndpoint() {
+    protected AbstractJsseEndpoint<S,?> getEndpoint() {
         // Over-ridden to add cast
-        return (AbstractJsseEndpoint<S>) super.getEndpoint();
+        return (AbstractJsseEndpoint<S,?>) super.getEndpoint();
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java Wed Dec  7 09:28:40 2016
@@ -47,7 +47,7 @@ public abstract class AbstractHttp11Prot
             StringManager.getManager(AbstractHttp11Protocol.class);
 
 
-    public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
+    public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) {
         super(endpoint);
         setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
         ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
@@ -81,7 +81,7 @@ public abstract class AbstractHttp11Prot
      * Over-ridden here to make the method visible to nested classes.
      */
     @Override
-    protected AbstractEndpoint<S> getEndpoint() {
+    protected AbstractEndpoint<S,?> getEndpoint() {
         return super.getEndpoint();
     }
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Wed Dec  7 09:28:40 2016
@@ -221,7 +221,7 @@ public class Http11Processor extends Abs
     private final Map<String,UpgradeProtocol> httpUpgradeProtocols;
 
 
-    public Http11Processor(int maxHttpHeaderSize, AbstractEndpoint<?> endpoint,int maxTrailerSize,
+    public Http11Processor(int maxHttpHeaderSize, AbstractEndpoint<?,?> endpoint,int maxTrailerSize,
             Set<String> allowedTrailerHeaders, int maxExtensionSize, int maxSwallowSize,
             Map<String,UpgradeProtocol> httpUpgradeProtocols) {
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Wed Dec  7 09:28:40 2016
@@ -36,7 +36,7 @@ import org.apache.juli.logging.Log;
 import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.collections.SynchronizedStack;
-import org.apache.tomcat.util.net.AbstractEndpoint.Acceptor.AcceptorState;
+import org.apache.tomcat.util.net.Acceptor.AcceptorState;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.threads.LimitLatch;
 import org.apache.tomcat.util.threads.ResizableExecutor;
@@ -45,12 +45,15 @@ import org.apache.tomcat.util.threads.Ta
 import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 
 /**
- * @param <S> The type for the sockets managed by this endpoint.
+ * @param <S> The type used by the socket wrapper associated with this endpoint.
+ *            May be the same as <U>.
+ * @param <U> The type of the underlying socket used by this endpoint. May be
+ *            the same as <S>.
  *
  * @author Mladen Turk
  * @author Remy Maucherat
  */
-public abstract class AbstractEndpoint<S> {
+public abstract class AbstractEndpoint<S,U> {
 
     // -------------------------------------------------------------- Constants
 
@@ -123,29 +126,6 @@ public abstract class AbstractEndpoint<S
         UNBOUND, BOUND_ON_INIT, BOUND_ON_START
     }
 
-    public abstract static class Acceptor implements Runnable {
-        public enum AcceptorState {
-            NEW, RUNNING, PAUSED, ENDED
-        }
-
-        protected volatile AcceptorState state = AcceptorState.NEW;
-        public final AcceptorState getState() {
-            return state;
-        }
-
-        private String threadName;
-        protected final void setThreadName(final String threadName) {
-            this.threadName = threadName;
-        }
-        protected final String getThreadName() {
-            return threadName;
-        }
-    }
-
-
-    private static final int INITIAL_ERROR_DELAY = 50;
-    private static final int MAX_ERROR_DELAY = 1600;
-
 
     // ----------------------------------------------------------------- Fields
 
@@ -182,7 +162,7 @@ public abstract class AbstractEndpoint<S
     /**
      * Threads used to accept new connections and pass them to worker threads.
      */
-    protected Acceptor[] acceptors;
+    protected List<Acceptor<U>> acceptors;
 
     /**
      * Cache for SocketProcessor objects
@@ -780,7 +760,7 @@ public abstract class AbstractEndpoint<S
     protected void unlockAccept() {
         // Only try to unlock the acceptor if it is necessary
         boolean unlockRequired = false;
-        for (Acceptor acceptor : acceptors) {
+        for (Acceptor<U> acceptor : acceptors) {
             if (acceptor.getState() == AcceptorState.RUNNING) {
                 unlockRequired = true;
                 break;
@@ -856,7 +836,7 @@ public abstract class AbstractEndpoint<S
 
                 // Wait for upto 1000ms acceptor threads to unlock
                 long waitLeft = 1000;
-                for (Acceptor acceptor : acceptors) {
+                for (Acceptor<U> acceptor : acceptors) {
                     while (waitLeft > 0 &&
                             acceptor.getState() == AcceptorState.RUNNING) {
                         Thread.sleep(50);
@@ -954,13 +934,14 @@ public abstract class AbstractEndpoint<S
 
     protected final void startAcceptorThreads() {
         int count = getAcceptorThreadCount();
-        acceptors = new Acceptor[count];
+        acceptors = new ArrayList<>(count);
 
         for (int i = 0; i < count; i++) {
-            acceptors[i] = createAcceptor();
+            Acceptor<U> acceptor = createAcceptor();
             String threadName = getName() + "-Acceptor-" + i;
-            acceptors[i].setThreadName(threadName);
-            Thread t = new Thread(acceptors[i], threadName);
+            acceptor.setThreadName(threadName);
+            acceptors.add(acceptor);
+            Thread t = new Thread(acceptor, threadName);
             t.setPriority(getAcceptorThreadPriority());
             t.setDaemon(getDaemon());
             t.start();
@@ -972,7 +953,7 @@ public abstract class AbstractEndpoint<S
      * Hook to allow Endpoints to provide a specific Acceptor implementation.
      * @return the acceptor
      */
-    protected abstract Acceptor createAcceptor();
+    protected abstract Acceptor<U> createAcceptor();
 
 
     /**
@@ -1045,35 +1026,14 @@ public abstract class AbstractEndpoint<S
         } else return -1;
     }
 
-    /**
-     * Provides a common approach for sub-classes to handle exceptions where a
-     * delay is required to prevent a Thread from entering a tight loop which
-     * will consume CPU and may also trigger large amounts of logging. For
-     * example, this can happen with the Acceptor thread if the ulimit for open
-     * files is reached.
-     *
-     * @param currentErrorDelay The current delay being applied on failure
-     * @return  The delay to apply on the next failure
-     */
-    protected int handleExceptionWithDelay(int currentErrorDelay) {
-        // Don't delay on first exception
-        if (currentErrorDelay > 0) {
-            try {
-                Thread.sleep(currentErrorDelay);
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-        }
+    protected abstract U serverSocketAccept() throws Exception;
 
-        // On subsequent exceptions, start the delay at 50ms, doubling the delay
-        // on every subsequent exception until the delay reaches 1.6 seconds.
-        if (currentErrorDelay == 0) {
-            return INITIAL_ERROR_DELAY;
-        } else if (currentErrorDelay < MAX_ERROR_DELAY) {
-            return currentErrorDelay * 2;
-        } else {
-            return MAX_ERROR_DELAY;
-        }
+    protected abstract boolean setSocketOptions(U socket);
+
+    protected abstract void closeSocket(U socket);
+
+    protected void destroySocket(U socket) {
+        closeSocket(socket);
     }
 }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java Wed Dec  7 09:28:40 2016
@@ -34,7 +34,7 @@ import org.apache.tomcat.util.net.SSLHos
 import org.apache.tomcat.util.net.openssl.OpenSSLImplementation;
 import org.apache.tomcat.util.net.openssl.ciphers.Cipher;
 
-public abstract class AbstractJsseEndpoint<S> extends AbstractEndpoint<S> {
+public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U> {
 
     private String sslImplementationName = null;
     private int sniParseLimit = 64 * 1024;

Added: tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java?rev=1773036&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java Wed Dec  7 09:28:40 2016
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.jni.Error;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.res.StringManager;
+
+public class Acceptor<U> implements Runnable {
+
+    private static final Log log = LogFactory.getLog(Acceptor.class);
+    private static final StringManager sm = StringManager.getManager(Acceptor.class);
+
+    private static final int INITIAL_ERROR_DELAY = 50;
+    private static final int MAX_ERROR_DELAY = 1600;
+
+    private final AbstractEndpoint<?,U> endpoint;
+    private String threadName;
+    protected volatile AcceptorState state = AcceptorState.NEW;
+
+
+    public Acceptor(AbstractEndpoint<?,U> endpoint) {
+        this.endpoint = endpoint;
+    }
+
+
+    public final AcceptorState getState() {
+        return state;
+    }
+
+
+    final void setThreadName(final String threadName) {
+        this.threadName = threadName;
+    }
+
+
+    final String getThreadName() {
+        return threadName;
+    }
+
+
+    @Override
+    public void run() {
+
+        int errorDelay = 0;
+
+        // Loop until we receive a shutdown command
+        while (endpoint.isRunning()) {
+
+            // Loop if endpoint is paused
+            while (endpoint.isPaused() && endpoint.isRunning()) {
+                state = AcceptorState.PAUSED;
+                try {
+                    Thread.sleep(50);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+
+            if (!endpoint.isRunning()) {
+                break;
+            }
+            state = AcceptorState.RUNNING;
+
+            try {
+                //if we have reached max connections, wait
+                endpoint.countUpOrAwaitConnection();
+
+                U socket = null;
+                try {
+                    // Accept the next incoming connection from the server
+                    // socket
+                    socket = endpoint.serverSocketAccept();
+                } catch (Exception ioe) {
+                    // We didn't get a socket
+                    endpoint.countDownConnection();
+                    if (endpoint.isRunning()) {
+                        // Introduce delay if necessary
+                        errorDelay = handleExceptionWithDelay(errorDelay);
+                        // re-throw
+                        throw ioe;
+                    } else {
+                        break;
+                    }
+                }
+                // Successful accept, reset the error delay
+                errorDelay = 0;
+
+                // Configure the socket
+                if (endpoint.isRunning() && !endpoint.isPaused()) {
+                    // setSocketOptions() will hand the socket off to
+                    // an appropriate processor if successful
+                    if (!endpoint.setSocketOptions(socket)) {
+                        endpoint.closeSocket(socket);
+                    }
+                } else {
+                    endpoint.destroySocket(socket);
+                }
+            } catch (Throwable t) {
+                ExceptionUtils.handleThrowable(t);
+                String msg = sm.getString("endpoint.accept.fail");
+                // APR specific.
+                // Could push this down but not sure it is worth the trouble.
+                if (t instanceof Error) {
+                    Error e = (Error) t;
+                    if (e.getError() == 233) {
+                        // Not an error on HP-UX so log as a warning
+                        // so it can be filtered out on that platform
+                        // See bug 50273
+                        log.warn(msg, t);
+                    } else {
+                        log.error(msg, t);
+                    }
+                } else {
+                        log.error(msg, t);
+                }
+            }
+        }
+        state = AcceptorState.ENDED;
+    }
+
+
+    /**
+     * Handles exceptions where a delay is required to prevent a Thread from
+     * entering a tight loop which will consume CPU and may also trigger large
+     * amounts of logging. For example, this can happen if the ulimit for open
+     * files is reached.
+     *
+     * @param currentErrorDelay The current delay being applied on failure
+     * @return  The delay to apply on the next failure
+     */
+    private int handleExceptionWithDelay(int currentErrorDelay) {
+        // Don't delay on first exception
+        if (currentErrorDelay > 0) {
+            try {
+                Thread.sleep(currentErrorDelay);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+
+        // On subsequent exceptions, start the delay at 50ms, doubling the delay
+        // on every subsequent exception until the delay reaches 1.6 seconds.
+        if (currentErrorDelay == 0) {
+            return INITIAL_ERROR_DELAY;
+        } else if (currentErrorDelay < MAX_ERROR_DELAY) {
+            return currentErrorDelay * 2;
+        } else {
+            return MAX_ERROR_DELAY;
+        }
+    }
+
+
+    public enum AcceptorState {
+        NEW, RUNNING, PAUSED, ENDED
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed Dec  7 09:28:40 2016
@@ -52,8 +52,8 @@ import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.buf.ByteBufferUtils;
 import org.apache.tomcat.util.collections.SynchronizedStack;
-import org.apache.tomcat.util.net.AbstractEndpoint.Acceptor.AcceptorState;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.Acceptor.AcceptorState;
 import org.apache.tomcat.util.net.SSLHostConfig.Type;
 import org.apache.tomcat.util.net.openssl.OpenSSLEngine;
 
@@ -73,7 +73,7 @@ import org.apache.tomcat.util.net.openss
  * @author Mladen Turk
  * @author Remy Maucherat
  */
-public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack {
+public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallBack {
 
     // -------------------------------------------------------------- Constants
 
@@ -636,7 +636,7 @@ public class AprEndpoint extends Abstrac
                     // Ignore
                 }
             }
-            for (AbstractEndpoint.Acceptor acceptor : acceptors) {
+            for (Acceptor<Long> acceptor : acceptors) {
                 long waitLeft = 10000;
                 while (waitLeft > 0 &&
                         acceptor.getState() != AcceptorState.ENDED &&
@@ -723,8 +723,8 @@ public class AprEndpoint extends Abstrac
     // ------------------------------------------------------ Protected Methods
 
     @Override
-    protected AbstractEndpoint.Acceptor createAcceptor() {
-        return new Acceptor();
+    protected Acceptor<Long> createAcceptor() {
+        return new Acceptor<>(this);
     }
 
 
@@ -817,20 +817,20 @@ public class AprEndpoint extends Abstrac
      *  and processing may continue, <code>false</code> if the socket needs to be
      *  close immediately
      */
-    protected boolean processSocketWithOptions(long socket) {
+    @Override
+    protected boolean setSocketOptions(Long socket) {
         try {
             // During shutdown, executor may be null - avoid NPE
             if (running) {
                 if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.debug.socket",
-                            Long.valueOf(socket)));
+                    log.debug(sm.getString("endpoint.debug.socket", socket));
                 }
-                AprSocketWrapper wrapper = new AprSocketWrapper(Long.valueOf(socket), this);
+                AprSocketWrapper wrapper = new AprSocketWrapper(socket, this);
                 wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
                 wrapper.setSecure(isSSLEnabled());
                 wrapper.setReadTimeout(getConnectionTimeout());
                 wrapper.setWriteTimeout(getConnectionTimeout());
-                connections.put(Long.valueOf(socket), wrapper);
+                connections.put(socket, wrapper);
                 getExecutor().execute(new SocketWithOptionsProcessor(wrapper));
             }
         } catch (RejectedExecutionException x) {
@@ -847,6 +847,20 @@ public class AprEndpoint extends Abstrac
     }
 
 
+    @Override
+    protected Long serverSocketAccept() throws Exception {
+        long socket = Socket.accept(serverSock);
+        if (log.isDebugEnabled()) {
+            long sa = Address.get(Socket.APR_REMOTE, socket);
+            Sockaddr addr = Address.getInfo(sa);
+            log.debug(sm.getString("endpoint.apr.remoteport",
+                    Long.valueOf(socket),
+                    Long.valueOf(addr.port)));
+        }
+        return Long.valueOf(socket);
+    }
+
+
     /**
      * Process the given socket. Typically keep alive or upgraded protocol.
      *
@@ -870,6 +884,12 @@ public class AprEndpoint extends Abstrac
     }
 
 
+    @Override
+    protected void closeSocket(Long socket) {
+        closeSocket(socket.longValue());
+    }
+
+
     private void closeSocket(long socket) {
         // Once this is called, the mapping from socket to wrapper will no
         // longer be required.
@@ -885,6 +905,17 @@ public class AprEndpoint extends Abstrac
      * is currently being used by the Poller. It is generally a bad idea to call
      * this directly from a known error condition.
      */
+    @Override
+    protected void destroySocket(Long socket) {
+        destroySocket(socket.longValue());
+    }
+
+
+    /*
+     * This method should only be called if there is no chance that the socket
+     * is currently being used by the Poller. It is generally a bad idea to call
+     * this directly from a known error condition.
+     */
     private void destroySocket(long socket) {
         connections.remove(Long.valueOf(socket));
         if (log.isDebugEnabled()) {
@@ -911,105 +942,6 @@ public class AprEndpoint extends Abstrac
         return log;
     }
 
-    // --------------------------------------------------- Acceptor Inner Class
-    /**
-     * The background thread that listens for incoming TCP/IP connections and
-     * hands them off to an appropriate processor.
-     */
-    protected class Acceptor extends AbstractEndpoint.Acceptor {
-
-        private final Log log = LogFactory.getLog(AprEndpoint.Acceptor.class);
-
-        @Override
-        public void run() {
-
-            int errorDelay = 0;
-
-            // Loop until we receive a shutdown command
-            while (running) {
-
-                // Loop if endpoint is paused
-                while (paused && running) {
-                    state = AcceptorState.PAUSED;
-                    try {
-                        Thread.sleep(50);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                }
-
-                if (!running) {
-                    break;
-                }
-                state = AcceptorState.RUNNING;
-
-                try {
-                    //if we have reached max connections, wait
-                    countUpOrAwaitConnection();
-
-                    long socket = 0;
-                    try {
-                        // Accept the next incoming connection from the server
-                        // socket
-                        socket = Socket.accept(serverSock);
-                        if (log.isDebugEnabled()) {
-                            long sa = Address.get(Socket.APR_REMOTE, socket);
-                            Sockaddr addr = Address.getInfo(sa);
-                            log.debug(sm.getString("endpoint.apr.remoteport",
-                                    Long.valueOf(socket),
-                                    Long.valueOf(addr.port)));
-                        }
-                    } catch (Exception e) {
-                        // We didn't get a socket
-                        countDownConnection();
-                        if (running) {
-                            // Introduce delay if necessary
-                            errorDelay = handleExceptionWithDelay(errorDelay);
-                            // re-throw
-                            throw e;
-                        } else {
-                            break;
-                        }
-                    }
-                    // Successful accept, reset the error delay
-                    errorDelay = 0;
-
-                    if (running && !paused) {
-                        // Hand this socket off to an appropriate processor
-                        if (!processSocketWithOptions(socket)) {
-                            // Close socket right away
-                            closeSocket(socket);
-                        }
-                    } else {
-                        // Close socket right away
-                        // No code path could have added the socket to the
-                        // Poller so use destroySocket()
-                        destroySocket(socket);
-                    }
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    String msg = sm.getString("endpoint.accept.fail");
-                    if (t instanceof Error) {
-                        Error e = (Error) t;
-                        if (e.getError() == 233) {
-                            // Not an error on HP-UX so log as a warning
-                            // so it can be filtered out on that platform
-                            // See bug 50273
-                            log.warn(msg, t);
-                        } else {
-                            log.error(msg, t);
-                        }
-                    } else {
-                            log.error(msg, t);
-                    }
-                }
-                // The processor will recycle itself when it finishes
-            }
-            state = AcceptorState.ENDED;
-        }
-    }
-
-
     // -------------------------------------------------- SocketInfo Inner Class
 
     public static class SocketInfo {

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Dec  7 09:28:40 2016
@@ -57,7 +57,7 @@ import org.apache.tomcat.util.net.jsse.J
 /**
  * NIO2 endpoint.
  */
-public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
+public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousSocketChannel> {
 
 
     // -------------------------------------------------------------- Constants
@@ -287,8 +287,8 @@ public class Nio2Endpoint extends Abstra
     }
 
     @Override
-    protected AbstractEndpoint.Acceptor createAcceptor() {
-        return new Acceptor();
+    protected Acceptor<AsynchronousSocketChannel> createAcceptor() {
+        return new Acceptor<>(this);
     }
 
     /**
@@ -298,6 +298,7 @@ public class Nio2Endpoint extends Abstra
      *  and processing may continue, <code>false</code> if the socket needs to be
      *  close immediately
      */
+    @Override
     protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
         try {
             socketProperties.setProperties(socket);
@@ -333,13 +334,44 @@ public class Nio2Endpoint extends Abstra
 
 
     @Override
+    protected void closeSocket(AsynchronousSocketChannel socket) {
+        countDownConnection();
+        try {
+            socket.close();
+        } catch (IOException ioe) {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("endpoint.err.close"), ioe);
+            }
+        }
+    }
+
+
+    @Override
+    protected NetworkChannel getServerSocket() {
+        return serverSock;
+    }
+
+
+    @Override
+    protected AsynchronousSocketChannel serverSocketAccept() throws Exception {
+        return serverSock.accept().get();
+    }
+
+
+    @Override
+    protected Log getLog() {
+        return log;
+    }
+
+
+    @Override
     protected SocketProcessorBase<Nio2Channel> createSocketProcessor(
             SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) {
         return new SocketProcessor(socketWrapper, event);
     }
 
 
-    public void closeSocket(SocketWrapperBase<Nio2Channel> socket) {
+    private void closeSocket(SocketWrapperBase<Nio2Channel> socket) {
         if (log.isDebugEnabled()) {
             log.debug("Calling [" + this + "].closeSocket([" + socket + "],[" + socket.getSocket() + "])",
                     new Exception());
@@ -377,105 +409,6 @@ public class Nio2Endpoint extends Abstra
         }
     }
 
-    @Override
-    protected Log getLog() {
-        return log;
-    }
-
-
-    @Override
-    protected NetworkChannel getServerSocket() {
-        return serverSock;
-    }
-
-
-    // --------------------------------------------------- Acceptor Inner Class
-    /**
-     * With NIO2, the main acceptor thread only initiates the initial accept
-     * but periodically checks that the connector is still accepting (if not
-     * it will attempt to start again).
-     */
-    protected class Acceptor extends AbstractEndpoint.Acceptor {
-
-        @Override
-        public void run() {
-
-            int errorDelay = 0;
-
-            // Loop until we receive a shutdown command
-            while (running) {
-
-                // Loop if endpoint is paused
-                while (paused && running) {
-                    state = AcceptorState.PAUSED;
-                    try {
-                        Thread.sleep(50);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                }
-
-                if (!running) {
-                    break;
-                }
-                state = AcceptorState.RUNNING;
-
-                try {
-                    //if we have reached max connections, wait
-                    countUpOrAwaitConnection();
-
-                    AsynchronousSocketChannel socket = null;
-                    try {
-                        // Accept the next incoming connection from the server
-                        // socket
-                        socket = serverSock.accept().get();
-                    } catch (Exception e) {
-                        // We didn't get a socket
-                        countDownConnection();
-                        if (running) {
-                            // Introduce delay if necessary
-                            errorDelay = handleExceptionWithDelay(errorDelay);
-                            // re-throw
-                            throw e;
-                        } else {
-                            break;
-                        }
-                    }
-                    // Successful accept, reset the error delay
-                    errorDelay = 0;
-
-                    // Configure the socket
-                    if (running && !paused) {
-                        // setSocketOptions() will hand the socket off to
-                        // an appropriate processor if successful
-                        if (!setSocketOptions(socket)) {
-                            closeSocket(socket);
-                       }
-                    } else {
-                        closeSocket(socket);
-                    }
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    log.error(sm.getString("endpoint.accept.fail"), t);
-                }
-            }
-            state = AcceptorState.ENDED;
-        }
-
-
-        private void closeSocket(AsynchronousSocketChannel socket) {
-            countDownConnection();
-            try {
-                socket.close();
-            } catch (IOException ioe) {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.close"), ioe);
-                }
-            }
-        }
-    }
-
-
     public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {
 
         private static final ThreadLocal<AtomicInteger> nestedWriteCompletionCount =

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Dec  7 09:28:40 2016
@@ -66,7 +66,7 @@ import org.apache.tomcat.util.net.jsse.J
  * @author Mladen Turk
  * @author Remy Maucherat
  */
-public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
+public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
 
 
     // -------------------------------------------------------------- Constants
@@ -340,8 +340,8 @@ public class NioEndpoint extends Abstrac
 
 
     @Override
-    protected AbstractEndpoint.Acceptor createAcceptor() {
-        return new Acceptor();
+    protected Acceptor<SocketChannel> createAcceptor() {
+        return new Acceptor<>(this);
     }
 
 
@@ -352,6 +352,7 @@ public class NioEndpoint extends Abstrac
      *  and processing may continue, <code>false</code> if the socket needs to be
      *  close immediately
      */
+    @Override
     protected boolean setSocketOptions(SocketChannel socket) {
         // Process the connection
         try {
@@ -391,8 +392,22 @@ public class NioEndpoint extends Abstrac
 
 
     @Override
-    protected Log getLog() {
-        return log;
+    protected void closeSocket(SocketChannel socket) {
+        countDownConnection();
+        try {
+            socket.socket().close();
+        } catch (IOException ioe)  {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("endpoint.err.close"), ioe);
+            }
+        }
+        try {
+            socket.close();
+        } catch (IOException ioe) {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("endpoint.err.close"), ioe);
+            }
+        }
     }
 
 
@@ -402,96 +417,15 @@ public class NioEndpoint extends Abstrac
     }
 
 
-    // --------------------------------------------------- Acceptor Inner Class
-    /**
-     * The background thread that listens for incoming TCP/IP connections and
-     * hands them off to an appropriate processor.
-     */
-    protected class Acceptor extends AbstractEndpoint.Acceptor {
-
-        @Override
-        public void run() {
-
-            int errorDelay = 0;
-
-            // Loop until we receive a shutdown command
-            while (running) {
-
-                // Loop if endpoint is paused
-                while (paused && running) {
-                    state = AcceptorState.PAUSED;
-                    try {
-                        Thread.sleep(50);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                }
-
-                if (!running) {
-                    break;
-                }
-                state = AcceptorState.RUNNING;
-
-                try {
-                    //if we have reached max connections, wait
-                    countUpOrAwaitConnection();
-
-                    SocketChannel socket = null;
-                    try {
-                        // Accept the next incoming connection from the server
-                        // socket
-                        socket = serverSock.accept();
-                    } catch (IOException ioe) {
-                        // We didn't get a socket
-                        countDownConnection();
-                        if (running) {
-                            // Introduce delay if necessary
-                            errorDelay = handleExceptionWithDelay(errorDelay);
-                            // re-throw
-                            throw ioe;
-                        } else {
-                            break;
-                        }
-                    }
-                    // Successful accept, reset the error delay
-                    errorDelay = 0;
-
-                    // Configure the socket
-                    if (running && !paused) {
-                        // setSocketOptions() will hand the socket off to
-                        // an appropriate processor if successful
-                        if (!setSocketOptions(socket)) {
-                            closeSocket(socket);
-                        }
-                    } else {
-                        closeSocket(socket);
-                    }
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    log.error(sm.getString("endpoint.accept.fail"), t);
-                }
-            }
-            state = AcceptorState.ENDED;
-        }
+    @Override
+    protected SocketChannel serverSocketAccept() throws Exception {
+        return serverSock.accept();
+    }
 
 
-        private void closeSocket(SocketChannel socket) {
-            countDownConnection();
-            try {
-                socket.socket().close();
-            } catch (IOException ioe)  {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.close"), ioe);
-                }
-            }
-            try {
-                socket.close();
-            } catch (IOException ioe) {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.close"), ioe);
-                }
-            }
-        }
+    @Override
+    protected Log getLog() {
+        return log;
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Wed Dec  7 09:28:40 2016
@@ -38,7 +38,7 @@ public abstract class SocketWrapperBase<
     protected static final StringManager sm = StringManager.getManager(SocketWrapperBase.class);
 
     private final E socket;
-    private final AbstractEndpoint<E> endpoint;
+    private final AbstractEndpoint<E,?> endpoint;
 
     // Volatile because I/O and setting the timeout values occurs on a different
     // thread to the thread checking the timeout.
@@ -90,7 +90,7 @@ public abstract class SocketWrapperBase<
      */
     protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer
 
-    public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) {
+    public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
         this.socket = socket;
         this.endpoint = endpoint;
         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -102,7 +102,7 @@ public abstract class SocketWrapperBase<
         return socket;
     }
 
-    public AbstractEndpoint<E> getEndpoint() {
+    public AbstractEndpoint<E,?> getEndpoint() {
         return endpoint;
     }
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Wed Dec  7 09:28:40 2016
@@ -222,7 +222,7 @@ public class WsRemoteEndpointImplServer
         if (sh != null) {
             if (useDispatch) {
                 OnResultRunnable r = new OnResultRunnable(sh, t);
-                AbstractEndpoint<?> endpoint = socketWrapper.getEndpoint();
+                AbstractEndpoint<?,?> endpoint = socketWrapper.getEndpoint();
                 Executor containerExecutor = endpoint.getExecutor();
                 if (endpoint.isRunning() && containerExecutor != null) {
                     containerExecutor.execute(r);

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1773036&r1=1773035&r2=1773036&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Wed Dec  7 09:28:40 2016
@@ -52,6 +52,10 @@
         with a JSSE connector and an explicit alias has not been configured.
         (markt)
       </fix>
+      <scode>
+        Extract the common Acceptor code from each Endpoint into a new Acceptor
+        class that is used by all Endpoints. (markt)
+      </scode>
     </changelog>
   </subsection>
 </section>




---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1773036 - in /tomcat/trunk: java/org/apache/coyote/ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/tomcat/util/net/ java/org/apache/tomcat/websocket/server/ webapps/docs/

Posted by Mark Thomas <ma...@apache.org>.
On 07/12/2016 09:28, markt@apache.org wrote:
> Author: markt
> Date: Wed Dec  7 09:28:40 2016
> New Revision: 1773036
> 
> URL: http://svn.apache.org/viewvc?rev=1773036&view=rev
> Log:
> Refactor the per Endpoint Acceptors into a single Acceptor class.

Some additional commentary:

- The I/O refactoring still feels like a work in progress. My aim
  remains reducing duplication and edge case differences between the
  three Endpoint implementations.

- Endpoint does reach quite a long way up the I/O stack. That doesn't
  seem quite right but I haven't looked at it closely.

- I ran the unit tests and they all passed after these changes.

- I'm on the fence whether to back-port this to 8.5.x. or not.

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org