You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by rs...@apache.org on 2019/12/23 20:02:39 UTC

[httpcomponents-core] branch master updated: ReactiveDataConsumer: Synchronize `flushToSubscriber`

This is an automated email from the ASF dual-hosted git repository.

rschmitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git


The following commit(s) were added to refs/heads/master by this push:
     new c5f6e5f  ReactiveDataConsumer: Synchronize `flushToSubscriber`
c5f6e5f is described below

commit c5f6e5f3a8cae07895026a9ccb87450f60e9e9c4
Author: Ryan Schmitt <rs...@apache.org>
AuthorDate: Sun Dec 22 15:53:07 2019 -0800

    ReactiveDataConsumer: Synchronize `flushToSubscriber`
    
    It is possible for two threads to race in such a way that a call to a
    method like `ReactiveDataConsumer#streamEnd` will never result in a call
    to `Subscriber#onComplete` from either thread. These races are solved by
    synchronizing the `flushToSubscriber` method, which is currently only
    protected against reentrant calls from the same thread.
---
 .../hc/core5/reactive/ReactiveDataConsumer.java    | 67 +++++++++++-----------
 1 file changed, 35 insertions(+), 32 deletions(-)

diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
index fc7c0f5..755c336 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
@@ -58,6 +58,7 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf
 
     private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
     private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+    private final Object flushLock = new Object();
     private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
     private volatile boolean buffersSent = false;
     private volatile boolean cancelled = false;
@@ -128,43 +129,45 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf
     }
 
     private void flushToSubscriber() {
-        final Subscriber<? super ByteBuffer> s = subscriber;
-        if (flushInProgress.getAndSet(true)) {
-            return;
-        }
-        try {
-            if (s == null) {
-                return;
-            }
-            if (exception != null) {
-                subscriber = null;
-                s.onError(exception);
+        synchronized (flushLock) {
+            final Subscriber<? super ByteBuffer> s = subscriber;
+            if (flushInProgress.getAndSet(true)) {
                 return;
             }
-            ByteBuffer next;
-            while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
-                final int bytesFreed = next.remaining();
-                s.onNext(next);
-                buffersSent = true;
-                requests.decrementAndGet();
-                windowScalingIncrement.addAndGet(bytesFreed);
-            }
-            final CapacityChannel localChannel = capacityChannel;
-            if (localChannel != null) {
-                try {
-                    signalCapacity(localChannel);
-                } catch (final IOException e) {
-                    exception = e;
-                    s.onError(e);
+            try {
+                if (s == null) {
                     return;
                 }
+                if (exception != null) {
+                    subscriber = null;
+                    s.onError(exception);
+                    return;
+                }
+                ByteBuffer next;
+                while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
+                    final int bytesFreed = next.remaining();
+                    s.onNext(next);
+                    buffersSent = true;
+                    requests.decrementAndGet();
+                    windowScalingIncrement.addAndGet(bytesFreed);
+                }
+                final CapacityChannel localChannel = capacityChannel;
+                if (localChannel != null) {
+                    try {
+                        signalCapacity(localChannel);
+                    } catch (final IOException e) {
+                        exception = e;
+                        s.onError(e);
+                        return;
+                    }
+                }
+                if (completed && buffers.isEmpty()) {
+                    subscriber = null;
+                    s.onComplete();
+                }
+            } finally {
+                flushInProgress.set(false);
             }
-            if (completed && buffers.isEmpty()) {
-                subscriber = null;
-                s.onComplete();
-            }
-        } finally {
-            flushInProgress.set(false);
         }
     }