You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/10/29 18:04:23 UTC

[tomcat] branch master updated: Move connection tracking to the endpoint

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d37df  Move connection tracking to the endpoint
49d37df is described below

commit 49d37dffb486f5c15a22d14fe858b3d7b12a0d66
Author: remm <re...@apache.org>
AuthorDate: Tue Oct 29 19:04:13 2019 +0100

    Move connection tracking to the endpoint
    
    It requires far fewer operations as the socket wrapper simply references
    the processor. A map then tracks the wrappers on open/close, instead of
    doing multiple operations on each socket processing.
    public Set<SocketWrapperBase<S>> getOpenSockets(); has a modified return
    type. This is not mandatory but more convenient. If it is likely to
    cause problems, it can be changed back.
---
 java/org/apache/coyote/AbstractProtocol.java       | 31 +++++++++++-----------
 .../apache/tomcat/util/net/AbstractEndpoint.java   | 14 ++++++++--
 java/org/apache/tomcat/util/net/AprEndpoint.java   |  7 ++---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  |  5 ++--
 java/org/apache/tomcat/util/net/NioEndpoint.java   |  1 +
 .../apache/tomcat/util/net/SocketWrapperBase.java  | 11 ++++++++
 webapps/docs/changelog.xml                         |  4 +++
 7 files changed, 51 insertions(+), 22 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java
index 61180a8..9cfa6e9 100644
--- a/java/org/apache/coyote/AbstractProtocol.java
+++ b/java/org/apache/coyote/AbstractProtocol.java
@@ -19,7 +19,6 @@ package org.apache.coyote;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -731,7 +730,6 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
         private final AbstractProtocol<S> proto;
         private final RequestGroupInfo global = new RequestGroupInfo();
         private final AtomicLong registerCount = new AtomicLong(0);
-        private final Map<S,Processor> connections = new ConcurrentHashMap<>();
         private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
 
         public ConnectionHandler(AbstractProtocol<S> proto) {
@@ -770,7 +768,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
             S socket = wrapper.getSocket();
 
-            Processor processor = connections.get(socket);
+            Processor processor = (Processor) wrapper.getCurrentProcessor();
             if (getLog().isDebugEnabled()) {
                 getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                         processor, socket));
@@ -854,7 +852,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                         wrapper.getSslSupport(getProtocol().getClientCertProvider()));
 
                 // Associate the processor with the connection
-                connections.put(socket, processor);
+                wrapper.setCurrentProcessor(processor);
 
                 SocketState state = SocketState.CLOSED;
                 do {
@@ -873,7 +871,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                                         wrapper, getProtocol().getAdapter());
                                 wrapper.unRead(leftOverInput);
                                 // Associate with the processor with the connection
-                                connections.put(socket, processor);
+                                wrapper.setCurrentProcessor(processor);
                             } else {
                                 if (getLog().isDebugEnabled()) {
                                     getLog().debug(sm.getString(
@@ -896,7 +894,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                             // Mark the connection as upgraded
                             wrapper.setUpgraded(true);
                             // Associate with the processor with the connection
-                            connections.put(socket, processor);
+                            wrapper.setCurrentProcessor(processor);
                             // Initialise the upgrade handler (which may trigger
                             // some IO using the new protocol which is why the lines
                             // above are necessary)
@@ -934,7 +932,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                 } else if (state == SocketState.OPEN) {
                     // In keep-alive but between requests. OK to recycle
                     // processor. Continue to poll for the next request.
-                    connections.remove(socket);
+                    wrapper.setCurrentProcessor(null);
                     release(processor);
                     wrapper.registerReadInterest();
                 } else if (state == SocketState.SENDFILE) {
@@ -960,7 +958,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                     // Connection closed. OK to recycle the processor.
                     // Processors handling upgrades require additional clean-up
                     // before release.
-                    connections.remove(socket);
+                    wrapper.setCurrentProcessor(null);
                     if (processor.isUpgrade()) {
                         UpgradeToken upgradeToken = processor.getUpgradeToken();
                         HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
@@ -1020,7 +1018,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
             // Make sure socket/processor is removed from the list of current
             // connections
-            connections.remove(socket);
+            wrapper.setCurrentProcessor(null);
             release(processor);
             return SocketState.CLOSED;
         }
@@ -1039,8 +1037,8 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
 
         @Override
-        public Set<S> getOpenSockets() {
-            return connections.keySet();
+        public Set<SocketWrapperBase<S>> getOpenSockets() {
+            return proto.getEndpoint().getConnections();
         }
 
 
@@ -1083,8 +1081,8 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
          */
         @Override
         public void release(SocketWrapperBase<S> socketWrapper) {
-            S socket = socketWrapper.getSocket();
-            Processor processor = connections.remove(socket);
+            Processor processor = (Processor) socketWrapper.getCurrentProcessor();
+            socketWrapper.setCurrentProcessor(null);
             release(processor);
         }
 
@@ -1152,8 +1150,11 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
              * Note that even if the endpoint is resumed, there is (currently)
              * no API to inform the Processors of this.
              */
-            for (Processor processor : connections.values()) {
-                processor.pause();
+            for (SocketWrapperBase<S> wrapper : proto.getEndpoint().getConnections()) {
+                Processor processor = (Processor) wrapper.getCurrentProcessor();
+                if (processor != null) {
+                    processor.pause();
+                }
             }
         }
     }
diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
index bbe163d..d0188e6 100644
--- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -101,10 +102,10 @@ public abstract class AbstractEndpoint<S,U> {
         /**
          * Obtain the currently open sockets.
          *
-         * @return The sockets for which the handler is tracking a currently
+         * @return The socket wrappers for which the handler is tracking a currently
          *         open connection
          */
-        public Set<S> getOpenSockets();
+        public Set<SocketWrapperBase<S>> getOpenSockets();
 
         /**
          * Release any resources associated with the given SocketWrapper.
@@ -183,6 +184,15 @@ public abstract class AbstractEndpoint<S,U> {
 
     private ObjectName oname = null;
 
+    /**
+     * Connection structure holding all current connections.
+     */
+    protected Map<SocketWrapperBase<S>, SocketWrapperBase<S>> connections = new ConcurrentHashMap<>();
+
+    public Set<SocketWrapperBase<S>> getConnections() {
+        return connections.keySet();
+    }
+
     // ----------------------------------------------------------------- Properties
 
     private String defaultSSLHostConfigName = SSLHostConfig.DEFAULT_SSL_HOST_NAME;
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index 997c01a..5629154 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -684,6 +684,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 log.debug(sm.getString("endpoint.debug.socket", socket));
             }
             AprSocketWrapper wrapper = new AprSocketWrapper(socket, this);
+            super.connections.put(wrapper, wrapper);
             wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
             wrapper.setSecure(isSSLEnabled());
             wrapper.setReadTimeout(getConnectionTimeout());
@@ -1992,8 +1993,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                         return;
                     }
                     // Process the request from this socket
-                    Handler.SocketState state = getHandler().process(socket,
-                            SocketEvent.OPEN_READ);
+                    Handler.SocketState state = getHandler().process(socket, SocketEvent.OPEN_READ);
                     if (state == Handler.SocketState.CLOSED) {
                         // Close socket and pool
                         closeSocket(socket.getSocket().longValue());
@@ -2002,6 +2002,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 }
             }
         }
+
     }
 
 
@@ -2012,7 +2013,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
      * This class is the equivalent of the Worker, but will simply use in an
      * external Executor thread pool.
      */
-    protected class SocketProcessor extends  SocketProcessorBase<Long> {
+    protected class SocketProcessor extends SocketProcessorBase<Long> {
 
         public SocketProcessor(SocketWrapperBase<Long> socketWrapper, SocketEvent event) {
             super(socketWrapper, event);
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 3fe951c..1ac7025 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -212,8 +212,8 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 public void run() {
                     // Then close all active connections if any remain
                     try {
-                        for (Nio2Channel channel : getHandler().getOpenSockets()) {
-                            channel.getSocketWrapper().close();
+                        for (SocketWrapperBase<Nio2Channel> wrapper : getConnections()) {
+                            wrapper.close();
                         }
                     } catch (Throwable t) {
                         ExceptionUtils.handleThrowable(t);
@@ -320,6 +320,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 }
             }
             Nio2SocketWrapper socketWrapper = new Nio2SocketWrapper(channel, this);
+            connections.put(socketWrapper, socketWrapper);
             channel.reset(socket, socketWrapper);
             socketWrapper.setReadTimeout(getConnectionTimeout());
             socketWrapper.setWriteTimeout(getConnectionTimeout());
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index f62607b..9ba8262 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -417,6 +417,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
             } else {
             }
             NioSocketWrapper socketWrapper = new NioSocketWrapper(channel, this);
+            connections.put(socketWrapper, socketWrapper);
             channel.reset(socket, socketWrapper);
             socketWrapper.setReadTimeout(getConnectionTimeout());
             socketWrapper.setWriteTimeout(getConnectionTimeout());
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index 2c082d6..cb9460e 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -108,6 +108,8 @@ public abstract class SocketWrapperBase<E> {
     protected final Semaphore writePending;
     protected volatile OperationState<?> writeOperation = null;
 
+    protected Object currentProcessor = null;
+
     public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
         this.socket = socket;
         this.endpoint = endpoint;
@@ -135,6 +137,14 @@ public abstract class SocketWrapperBase<E> {
         return endpoint;
     }
 
+    public Object getCurrentProcessor() {
+        return currentProcessor;
+    }
+
+    public void setCurrentProcessor(Object currentProcessor) {
+        this.currentProcessor = currentProcessor;
+    }
+
     /**
      * Transfers processing to a container thread.
      *
@@ -399,6 +409,7 @@ public abstract class SocketWrapperBase<E> {
                     log.error(sm.getString("endpoint.debug.handlerRelease"), e);
                 }
             }
+            getEndpoint().connections.remove(this);
             doClose();
         }
     }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 1705743..a2dadbe 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -105,6 +105,10 @@
         <bug>63879</bug>: Remove stack trace from debug logging on socket
         wrapper close. (remm)
       </fix>
+      <fix>
+        Move connection tracking to the endpoint, since it requires far fewer
+        operations. (remm)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Other">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org