You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2022/10/17 16:32:27 UTC

[httpcomponents-core] 01/01: HTTPCORE-726: Improved capacity management in SharedInputBuffer

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch HTTPCORE-726
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit dc179e1416c65edc36de49cf8edf42ab1e941833
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Mon Oct 17 17:29:06 2022 +0200

    HTTPCORE-726: Improved capacity management in SharedInputBuffer
---
 .../nio/support/classic/SharedInputBuffer.java     | 37 ++++++++++++++--------
 .../nio/support/classic/TestSharedInputBuffer.java |  2 +-
 2 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
index b19367540..45dea032c 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http.nio.support.classic;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hc.core5.annotation.Contract;
@@ -41,14 +42,19 @@ import org.apache.hc.core5.http.nio.CapacityChannel;
 @Contract(threading = ThreadingBehavior.SAFE)
 public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
 
+    private final int initialBufferSize;
+    private final AtomicInteger capacityIncrement;
+
     private volatile CapacityChannel capacityChannel;
 
     public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
         super(lock, initialBufferSize);
+        this.initialBufferSize = initialBufferSize;
+        this.capacityIncrement = new AtomicInteger(0);
     }
 
     public SharedInputBuffer(final int bufferSize) {
-        super(new ReentrantLock(), bufferSize);
+        this(new ReentrantLock(), bufferSize);
     }
 
     public int fill(final ByteBuffer src) {
@@ -65,13 +71,22 @@ public final class SharedInputBuffer extends AbstractSharedBuffer implements Con
         }
     }
 
+    private void incrementCapacity() throws IOException {
+        if (capacityChannel != null) {
+            final int increment = capacityIncrement.getAndSet(0);
+            if (increment > 0) {
+                capacityChannel.update(increment);
+            }
+        }
+    }
+
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
         lock.lock();
         try {
             this.capacityChannel = capacityChannel;
             setInputMode();
-            if (buffer().hasRemaining()) {
-                capacityChannel.update(buffer().remaining());
+            if (buffer().position() == 0) {
+                capacityChannel.update(initialBufferSize);
             }
         } finally {
             lock.unlock();
@@ -106,11 +121,9 @@ public final class SharedInputBuffer extends AbstractSharedBuffer implements Con
                 return -1;
             }
             final int b = buffer().get() & 0xff;
-            if (!buffer().hasRemaining() && capacityChannel != null) {
-                setInputMode();
-                if (buffer().hasRemaining()) {
-                    capacityChannel.update(buffer().remaining());
-                }
+            capacityIncrement.incrementAndGet();
+            if (!buffer().hasRemaining()) {
+                incrementCapacity();
             }
             return b;
         } finally {
@@ -132,11 +145,9 @@ public final class SharedInputBuffer extends AbstractSharedBuffer implements Con
             }
             final int chunk = Math.min(buffer().remaining(), len);
             buffer().get(b, off, chunk);
-            if (!buffer().hasRemaining() && capacityChannel != null) {
-                setInputMode();
-                if (buffer().hasRemaining()) {
-                    capacityChannel.update(buffer().remaining());
-                }
+            capacityIncrement.addAndGet(chunk);
+            if (!buffer().hasRemaining()) {
+                incrementCapacity();
             }
             return chunk;
         } finally {
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
index b02fc5c1d..58da1fad6 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
@@ -154,7 +154,7 @@ public class TestSharedInputBuffer {
 
         Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
         Assert.assertEquals(Integer.valueOf('a'), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
-        Mockito.verify(capacityChannel).update(10);
+        Mockito.verify(capacityChannel).update(1);
     }
 
     @Test