You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by rs...@apache.org on 2019/02/23 23:48:09 UTC

[httpcomponents-core] branch master updated: Add a thread-safe capacity channel to AbstractHttp1StreamDuplexer

This is an automated email from the ASF dual-hosted git repository.

rschmitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 827d6dc  Add a thread-safe capacity channel to AbstractHttp1StreamDuplexer
827d6dc is described below

commit 827d6dc5323839cc76697f873bdf4be2ba74283c
Author: Richard Hernandez <ri...@amazon.com>
AuthorDate: Thu Feb 21 00:09:18 2019 -0800

    Add a thread-safe capacity channel to AbstractHttp1StreamDuplexer
    
    This solves two issues:
    
    1. A data consumer that retains a reference to an old capacity channel
    will no longer be able to modify state for future requests occurring on
    the same connection (e.g. updating the window or requesting reads).
    
    2. Coupling changes to the input window with changes to the io session
    state prevents a race condition occuring when a consumer asynchronously
    calls CapacityChannel#udpate at the same moment the input window is
    exhausted by new input. Without synchronization, it was possible (albeit
    unlikely) for the IO thread to decrease the window below zero, a
    consumer thread to request input by calling CapacityChannel#update, then
    the IO thread to clear the read request.
---
 .../http/impl/nio/AbstractHttp1StreamDuplexer.java |  95 ++++++++++++-------
 ...tAbstractHttp1StreamDuplexerCapacityWindow.java | 102 +++++++++++++++++++++
 2 files changed, 164 insertions(+), 33 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index 0c7d9aa..37b144a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -71,6 +71,7 @@ import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.ProtocolIOSession;
 import org.apache.hc.core5.reactor.ssl.TlsDetails;
 import org.apache.hc.core5.util.Args;
@@ -93,13 +94,13 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
     private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
     private final ContentLengthStrategy incomingContentStrategy;
     private final ContentLengthStrategy outgoingContentStrategy;
-    private final AtomicInteger inputWindow;
     private final ByteBuffer contentBuffer;
     private final AtomicInteger outputRequests;
 
     private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
     private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
     private volatile ConnectionState connState;
+    private volatile CapacityWindow capacityWindow;
 
     private volatile ProtocolVersion version;
     private volatile EndpointDetails endpointDetails;
@@ -129,7 +130,6 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                 DefaultContentLengthStrategy.INSTANCE;
         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
                 DefaultContentLengthStrategy.INSTANCE;
-        this.inputWindow = new AtomicInteger(0);
         this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
         this.outputRequests = new AtomicInteger(0);
         this.connState = ConnectionState.READY;
@@ -237,19 +237,6 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
         processCommands();
     }
 
-    private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
-        for (;;) {
-            final int current = window.get();
-            final long newValue = (long) current + delta;
-            if (Math.abs(newValue) > 0x7fffffffL) {
-                throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
-            }
-            if (window.compareAndSet(current, (int) newValue)) {
-                return (int) newValue;
-            }
-        }
-    }
-
     public final void onInput() throws HttpException, IOException {
         while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
             int totalBytesRead = 0;
@@ -285,7 +272,7 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                             consumeHeader(messageHead, null);
                             contentDecoder = null;
                         }
-                        inputWindow.set(h1Config.getInitialWindowSize());
+                        capacityWindow = new CapacityWindow(h1Config.getInitialWindowSize(), ioSession);
                         if (contentDecoder != null) {
                             incomingMessage = new Message<>(messageHead, contentDecoder);
                             break;
@@ -322,30 +309,17 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                     contentBuffer.flip();
                     consumeData(contentBuffer);
                     contentBuffer.clear();
-                    final int capacity = updateWindow(inputWindow, -bytesRead);
+                    final int capacity = capacityWindow.removeCapacity(bytesRead);
                     if (capacity <= 0) {
                         if (!contentDecoder.isCompleted()) {
-                            ioSession.clearEvent(SelectionKey.OP_READ);
-                            updateCapacity(new CapacityChannel() {
-
-                                @Override
-                                public void update(final int increment) throws IOException {
-                                    if (increment > 0) {
-                                        final int capacity = inputWindow.get();
-                                        final int remaining = capacity > 0 ? Integer.MAX_VALUE - capacity : Integer.MAX_VALUE;
-                                        final int chunk = Math.min(increment, remaining);
-                                        updateWindow(inputWindow, chunk);
-                                        requestSessionInput();
-                                    }
-                                }
-
-                            });
+                            updateCapacity(capacityWindow);
                         }
                         break;
                     }
                 }
                 if (contentDecoder.isCompleted()) {
                     dataEnd(contentDecoder.getTrailers());
+                    capacityWindow.close();
                     incomingMessage = null;
                     ioSession.setEvent(SelectionKey.OP_READ);
                     inputEnd();
@@ -627,7 +601,62 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
         buf.append("connState=").append(connState)
                 .append(", inbuf=").append(inbuf)
                 .append(", outbuf=").append(outbuf)
-                .append(", inputWindow=").append(inputWindow);
+                .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
     }
 
+    static class CapacityWindow implements CapacityChannel {
+        private final IOSession ioSession;
+        private int window;
+        private boolean closed;
+
+        CapacityWindow(final int window, final IOSession ioSession) {
+            this.window = window;
+            this.ioSession = ioSession;
+        }
+
+        @Override
+        public synchronized void update(final int increment) throws IOException {
+            if (closed) {
+                return;
+            }
+            if (increment > 0) {
+                updateWindow(increment);
+                ioSession.setEvent(SelectionKey.OP_READ);
+            }
+        }
+
+        /**
+         * Internal method for removing capacity. We don't need to check
+         * if this channel is closed in it.
+         */
+        synchronized int removeCapacity(final int delta) {
+            updateWindow(-delta);
+            if (window <= 0) {
+                ioSession.clearEvent(SelectionKey.OP_READ);
+            }
+            return window;
+        }
+
+        private void updateWindow(final int delta) {
+            int newValue = window + delta;
+            // Math.addExact
+            if (((window ^ newValue) & (delta ^ newValue)) < 0) {
+                newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
+            }
+            window = newValue;
+        }
+
+        /**
+         * Closes the capacity channel, preventing user code from accidentally requesting
+         * read events outside of the context of the request the channel was created for
+         */
+        synchronized void close() {
+            closed = true;
+        }
+
+        // visible for testing
+        int getWindow() {
+            return window;
+        }
+    }
 }
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/nio/TestAbstractHttp1StreamDuplexerCapacityWindow.java b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/nio/TestAbstractHttp1StreamDuplexerCapacityWindow.java
new file mode 100644
index 0000000..de22ec7
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/nio/TestAbstractHttp1StreamDuplexerCapacityWindow.java
@@ -0,0 +1,102 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import org.apache.hc.core5.http.impl.nio.AbstractHttp1StreamDuplexer.CapacityWindow;
+import org.apache.hc.core5.reactor.IOSession;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestAbstractHttp1StreamDuplexerCapacityWindow {
+    @Mock private IOSession ioSession;
+
+    @Before
+    public void before() {
+        initMocks(this);
+    }
+
+    @Test
+    public void testWindowUpdate() throws IOException {
+        final CapacityWindow window = new CapacityWindow(0, ioSession);
+        window.update(1);
+        assertEquals(1, window.getWindow());
+        verify(ioSession).setEvent(eq(SelectionKey.OP_READ));
+        verifyNoMoreInteractions(ioSession);
+    }
+
+    @Test
+    public void testRemoveCapacity() {
+        final CapacityWindow window = new CapacityWindow(1, ioSession);
+        window.removeCapacity(1);
+        assertEquals(0, window.getWindow());
+        verify(ioSession).clearEvent(eq(SelectionKey.OP_READ));
+        verifyNoMoreInteractions(ioSession);
+    }
+
+    @Test
+    public void noReadsSetAfterWindowIsClosed() throws IOException {
+        final CapacityWindow window = new CapacityWindow(1, ioSession);
+        window.close();
+        window.update(1);
+        verifyZeroInteractions(ioSession);
+    }
+
+    @Test
+    public void windowCannotUnderflow() {
+        final CapacityWindow window = new CapacityWindow(Integer.MIN_VALUE, ioSession);
+        window.removeCapacity(1);
+        assertEquals(Integer.MIN_VALUE, window.getWindow());
+    }
+
+    @Test
+    public void windowCannotOverflow() throws IOException{
+        final CapacityWindow window = new CapacityWindow(Integer.MAX_VALUE, ioSession);
+        window.update(1);
+        assertEquals(Integer.MAX_VALUE, window.getWindow());
+    }
+
+    @Test
+    public void noChangesIfUpdateIsNonPositive() throws IOException {
+        final CapacityWindow window = new CapacityWindow(1, ioSession);
+        window.update(0);
+        window.update(-1);
+        assertEquals(1, window.getWindow());
+        verifyZeroInteractions(ioSession);
+    }
+}