You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by rs...@apache.org on 2019/12/23 20:02:39 UTC
[httpcomponents-core] branch master updated: ReactiveDataConsumer:
Synchronize `flushToSubscriber`
This is an automated email from the ASF dual-hosted git repository.
rschmitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
The following commit(s) were added to refs/heads/master by this push:
new c5f6e5f ReactiveDataConsumer: Synchronize `flushToSubscriber`
c5f6e5f is described below
commit c5f6e5f3a8cae07895026a9ccb87450f60e9e9c4
Author: Ryan Schmitt <rs...@apache.org>
AuthorDate: Sun Dec 22 15:53:07 2019 -0800
ReactiveDataConsumer: Synchronize `flushToSubscriber`
It is possible for two threads to race in such a way that a call to a
method like `ReactiveDataConsumer#streamEnd` will never result in a call
to `Subscriber#onComplete` from either thread. These races are solved by
synchronizing the `flushToSubscriber` method, which is currently only
protected against reentrant calls from the same thread.
---
.../hc/core5/reactive/ReactiveDataConsumer.java | 67 +++++++++++-----------
1 file changed, 35 insertions(+), 32 deletions(-)
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
index fc7c0f5..755c336 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
@@ -58,6 +58,7 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf
private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+ private final Object flushLock = new Object();
private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
private volatile boolean buffersSent = false;
private volatile boolean cancelled = false;
@@ -128,43 +129,45 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf
}
private void flushToSubscriber() {
- final Subscriber<? super ByteBuffer> s = subscriber;
- if (flushInProgress.getAndSet(true)) {
- return;
- }
- try {
- if (s == null) {
- return;
- }
- if (exception != null) {
- subscriber = null;
- s.onError(exception);
+ synchronized (flushLock) {
+ final Subscriber<? super ByteBuffer> s = subscriber;
+ if (flushInProgress.getAndSet(true)) {
return;
}
- ByteBuffer next;
- while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
- final int bytesFreed = next.remaining();
- s.onNext(next);
- buffersSent = true;
- requests.decrementAndGet();
- windowScalingIncrement.addAndGet(bytesFreed);
- }
- final CapacityChannel localChannel = capacityChannel;
- if (localChannel != null) {
- try {
- signalCapacity(localChannel);
- } catch (final IOException e) {
- exception = e;
- s.onError(e);
+ try {
+ if (s == null) {
return;
}
+ if (exception != null) {
+ subscriber = null;
+ s.onError(exception);
+ return;
+ }
+ ByteBuffer next;
+ while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
+ final int bytesFreed = next.remaining();
+ s.onNext(next);
+ buffersSent = true;
+ requests.decrementAndGet();
+ windowScalingIncrement.addAndGet(bytesFreed);
+ }
+ final CapacityChannel localChannel = capacityChannel;
+ if (localChannel != null) {
+ try {
+ signalCapacity(localChannel);
+ } catch (final IOException e) {
+ exception = e;
+ s.onError(e);
+ return;
+ }
+ }
+ if (completed && buffers.isEmpty()) {
+ subscriber = null;
+ s.onComplete();
+ }
+ } finally {
+ flushInProgress.set(false);
}
- if (completed && buffers.isEmpty()) {
- subscriber = null;
- s.onComplete();
- }
- } finally {
- flushInProgress.set(false);
}
}