You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/05/23 10:54:18 UTC
[httpcomponents-core] 01/01: Basic message consumer simplifications
and improvements
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch message-support
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit d31f4ff5c3f44debb66730788038178d7e1c39a9
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Wed May 22 16:12:16 2019 +0200
Basic message consumer simplifications and improvements
---
.../core5/reactive/ReactiveResponseConsumer.java | 5 --
.../apache/hc/core5/benchmark/BenchmarkWorker.java | 5 --
.../core5/testing/nio/MessageExchangeHandler.java | 5 ++
.../hc/core5/http/nio/AsyncRequestConsumer.java | 8 ---
.../hc/core5/http/nio/AsyncResponseConsumer.java | 8 ---
.../hc/core5/http/nio/BasicRequestConsumer.java | 47 ++++++++++-------
.../hc/core5/http/nio/BasicResponseConsumer.java | 43 ++++++++++-----
.../support/AbstractAsyncRequesterConsumer.java | 51 ++++++++++++------
.../nio/support/AbstractAsyncResponseConsumer.java | 61 ++++++++++++----------
9 files changed, 130 insertions(+), 103 deletions(-)
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
index 4300e3d..980194a 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
@@ -142,11 +142,6 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
}
@Override
- public Void getResult() {
- return null;
- }
-
- @Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
reactiveDataConsumer.updateCapacity(capacityChannel);
}
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
index a254653..ce10276 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
@@ -255,11 +255,6 @@ class BenchmarkWorker implements ResourceHolder {
}
@Override
- public Void getResult() {
- return null;
- }
-
- @Override
public void failed(final Exception cause) {
stats.incFailureCount();
final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
index 0e7565e..8031412 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
@@ -26,6 +26,7 @@
*/
package org.apache.hc.core5.testing.nio;
+import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
@@ -48,6 +49,10 @@ public abstract class MessageExchangeHandler<T> extends AbstractServerExchangeHa
this.requestConsumer = requestConsumer;
}
+ public MessageExchangeHandler(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+ this(new BasicRequestConsumer<>(dataConsumerSupplier));
+ }
+
public MessageExchangeHandler(final AsyncEntityConsumer<T> entityConsumer) {
this(new BasicRequestConsumer<>(entityConsumer));
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
index c4220f4..89f58a4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
@@ -63,12 +63,4 @@ public interface AsyncRequestConsumer<T> extends AsyncDataConsumer {
*/
void failed(Exception cause);
- /**
- * Returns the result of request processing when it becomes available or {@code null}
- * if the request is still being received.
- *
- * @return the request processing result.
- */
- T getResult();
-
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
index 9c14b3d..e5a0be6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
@@ -71,12 +71,4 @@ public interface AsyncResponseConsumer<T> extends AsyncDataConsumer {
*/
void failed(Exception cause);
- /**
- * Returns the result of response processing when it becomes available or {@code null}
- * if the response is still being received.
- *
- * @return the response processing result.
- */
- T getResult();
-
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
index 052b67d..7be827f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
@@ -29,8 +29,10 @@ package org.apache.hc.core5.http.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
@@ -38,7 +40,6 @@ import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
/**
* Basic implementation of {@link AsyncRequestConsumer} that represents the request message as
@@ -48,12 +49,23 @@ import org.apache.hc.core5.util.Asserts;
*/
public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<HttpRequest, T>> {
- private final AsyncEntityConsumer<T> dataConsumer;
+ private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
+ private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
- private volatile Message<HttpRequest, T> result;
+ public BasicRequestConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+ this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+ this.dataConsumerRef = new AtomicReference<>(null);
+ }
public BasicRequestConsumer(final AsyncEntityConsumer<T> dataConsumer) {
- this.dataConsumer = dataConsumer;
+ this(new Supplier<AsyncEntityConsumer<T>>() {
+
+ @Override
+ public AsyncEntityConsumer<T> get() {
+ return dataConsumer;
+ }
+
+ });
}
@Override
@@ -64,16 +76,20 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
final FutureCallback<Message<HttpRequest, T>> resultCallback) throws HttpException, IOException {
Args.notNull(request, "Request");
if (entityDetails != null) {
- Asserts.notNull(dataConsumer, "Data consumer");
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
+ if (dataConsumer == null) {
+ throw new HttpException("Supplied data consumer is null");
+ }
+ dataConsumerRef.set(dataConsumer);
+
dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
@Override
public void completed(final T body) {
- result = new Message<>(request, body);
+ final Message<HttpRequest, T> result = new Message<>(request, body);
if (resultCallback != null) {
resultCallback.completed(result);
}
- dataConsumer.releaseResources();
}
@Override
@@ -81,7 +97,6 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
if (resultCallback != null) {
resultCallback.failed(ex);
}
- dataConsumer.releaseResources();
}
@Override
@@ -89,34 +104,32 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
if (resultCallback != null) {
resultCallback.cancelled();
}
- dataConsumer.releaseResources();
}
});
} else {
- result = new Message<>(request, null);
+ final Message<HttpRequest, T> result = new Message<>(request, null);
if (resultCallback != null) {
resultCallback.completed(result);
}
- releaseResources();
}
}
@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- Asserts.notNull(dataConsumer, "Data consumer");
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
dataConsumer.updateCapacity(capacityChannel);
}
@Override
public void consume(final ByteBuffer src) throws IOException {
- Asserts.notNull(dataConsumer, "Data consumer");
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
dataConsumer.consume(src);
}
@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- Asserts.notNull(dataConsumer, "Data consumer");
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
dataConsumer.streamEnd(trailers);
}
@@ -126,12 +139,8 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
}
@Override
- public Message<HttpRequest, T> getResult() {
- return result;
- }
-
- @Override
public void releaseResources() {
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
if (dataConsumer != null) {
dataConsumer.releaseResources();
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
index 1aa31b8..2501985 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
@@ -29,8 +29,10 @@ package org.apache.hc.core5.http.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
@@ -48,12 +50,23 @@ import org.apache.hc.core5.util.Args;
*/
public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<HttpResponse, T>> {
- private final AsyncEntityConsumer<T> dataConsumer;
+ private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
+ private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
- private volatile Message<HttpResponse, T> result;
+ public BasicResponseConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+ this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+ this.dataConsumerRef = new AtomicReference<>(null);
+ }
public BasicResponseConsumer(final AsyncEntityConsumer<T> dataConsumer) {
- this.dataConsumer = Args.notNull(dataConsumer, "Consumer");
+ this(new Supplier<AsyncEntityConsumer<T>>() {
+
+ @Override
+ public AsyncEntityConsumer<T> get() {
+ return dataConsumer;
+ }
+
+ });
}
@Override
@@ -64,15 +77,19 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
Args.notNull(response, "Response");
if (entityDetails != null) {
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
+ if (dataConsumer == null) {
+ throw new HttpException("Supplied data consumer is null");
+ }
+ dataConsumerRef.set(dataConsumer);
dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
@Override
public void completed(final T body) {
- result = new Message<>(response, body);
+ final Message<HttpResponse, T> result = new Message<>(response, body);
if (resultCallback != null) {
resultCallback.completed(result);
}
- dataConsumer.releaseResources();
}
@Override
@@ -91,11 +108,10 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
});
} else {
- result = new Message<>(response, null);
+ final Message<HttpResponse, T> result = new Message<>(response, null);
if (resultCallback != null) {
resultCallback.completed(result);
}
- dataConsumer.releaseResources();
}
}
@@ -105,16 +121,19 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
dataConsumer.updateCapacity(capacityChannel);
}
@Override
public void consume(final ByteBuffer src) throws IOException {
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
dataConsumer.consume(src);
}
@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
dataConsumer.streamEnd(trailers);
}
@@ -124,13 +143,11 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
}
@Override
- public Message<HttpResponse, T> getResult() {
- return result;
- }
-
- @Override
public void releaseResources() {
- dataConsumer.releaseResources();
+ final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
+ if (dataConsumer != null) {
+ dataConsumer.releaseResources();
+ }
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
index 3864a08..da6b861 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
@@ -30,8 +30,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
@@ -54,13 +56,23 @@ import org.apache.hc.core5.util.Args;
*/
public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncRequestConsumer<T> {
- private final AsyncEntityConsumer<E> entityConsumer;
+ private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
+ private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
- private volatile T result;
+ public AbstractAsyncRequesterConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
+ this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+ this.dataConsumerRef = new AtomicReference<>(null);
+ }
+
+ public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> dataConsumer) {
+ this(new Supplier<AsyncEntityConsumer<E>>() {
- public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> entityConsumer) {
- Args.notNull(entityConsumer, "Entity consumer");
- this.entityConsumer = entityConsumer;
+ @Override
+ public AsyncEntityConsumer<E> get() {
+ return dataConsumer;
+ }
+
+ });
}
/**
@@ -78,14 +90,19 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
final EntityDetails entityDetails,
final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
if (entityDetails != null) {
- entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
+ if (dataConsumer == null) {
+ throw new HttpException("Supplied data consumer is null");
+ }
+ dataConsumerRef.set(dataConsumer);
+ dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
@Override
public void completed(final E entity) {
final ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
- result = buildResult(request, entity, contentType);
+ final T result = buildResult(request, entity, contentType);
resultCallback.completed(result);
} catch (final UnsupportedCharsetException ex) {
resultCallback.failed(ex);
@@ -105,29 +122,26 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
});
} else {
resultCallback.completed(buildResult(request, null, null));
- entityConsumer.releaseResources();
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- entityConsumer.updateCapacity(capacityChannel);
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+ dataConsumer.updateCapacity(capacityChannel);
}
@Override
public final void consume(final ByteBuffer src) throws IOException {
- entityConsumer.consume(src);
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+ dataConsumer.consume(src);
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- entityConsumer.streamEnd(trailers);
- }
-
- @Override
- public T getResult() {
- return result;
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+ dataConsumer.streamEnd(trailers);
}
@Override
@@ -137,7 +151,10 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
@Override
public final void releaseResources() {
- entityConsumer.releaseResources();
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
+ if (dataConsumer != null) {
+ dataConsumer.releaseResources();
+ }
}
}
\ No newline at end of file
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
index 0472f8f..9012ee2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
@@ -55,15 +56,23 @@ import org.apache.hc.core5.util.Args;
*/
public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
- private final AsyncEntityConsumer<E> entityConsumer;
- private final AtomicReference<T> resultRef;
- private final AtomicReference<Exception> exceptionRef;
+ private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
+ private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
- public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> entityConsumer) {
- Args.notNull(entityConsumer, "Entity consumer");
- this.entityConsumer = entityConsumer;
- this.resultRef = new AtomicReference<>(null);
- this.exceptionRef = new AtomicReference<>(null);
+ public AbstractAsyncResponseConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
+ this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+ this.dataConsumerRef = new AtomicReference<>(null);
+ }
+
+ public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> dataConsumer) {
+ this(new Supplier<AsyncEntityConsumer<E>>() {
+
+ @Override
+ public AsyncEntityConsumer<E> get() {
+ return dataConsumer;
+ }
+
+ });
}
/**
@@ -81,7 +90,12 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
final EntityDetails entityDetails,
final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
if (entityDetails != null) {
- entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
+ if (dataConsumer == null) {
+ throw new HttpException("Supplied data consumer is null");
+ }
+ dataConsumerRef.set(dataConsumer);
+ dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
@Override
public void completed(final E entity) {
@@ -89,12 +103,10 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
try {
contentType = ContentType.parse(entityDetails.getContentType());
final T result = buildResult(response, entity, contentType);
- resultRef.compareAndSet(null, result);
if (resultCallback != null) {
resultCallback.completed(result);
}
} catch (final UnsupportedCharsetException ex) {
- exceptionRef.compareAndSet(null, ex);
if (resultCallback != null) {
resultCallback.failed(ex);
}
@@ -103,7 +115,6 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
@Override
public void failed(final Exception ex) {
- exceptionRef.compareAndSet(null, ex);
if (resultCallback != null) {
resultCallback.failed(ex);
}
@@ -119,48 +130,42 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
});
} else {
final T result = buildResult(response, null, null);
- resultRef.compareAndSet(null, result);
if (resultCallback != null) {
resultCallback.completed(result);
}
- entityConsumer.releaseResources();
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- entityConsumer.updateCapacity(capacityChannel);
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+ dataConsumer.updateCapacity(capacityChannel);
}
@Override
public final void consume(final ByteBuffer src) throws IOException {
- entityConsumer.consume(src);
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+ dataConsumer.consume(src);
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- entityConsumer.streamEnd(trailers);
- }
-
- @Override
- public T getResult() {
- return resultRef.get();
- }
-
- public Exception getException() {
- return exceptionRef.get();
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+ dataConsumer.streamEnd(trailers);
}
@Override
public final void failed(final Exception cause) {
- exceptionRef.compareAndSet(null, cause);
releaseResources();
}
@Override
public final void releaseResources() {
- entityConsumer.releaseResources();
+ final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
+ if (dataConsumer != null) {
+ dataConsumer.releaseResources();
+ }
}
}
\ No newline at end of file