You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/05/23 10:54:18 UTC

[httpcomponents-core] 01/01: Basic message consumer simplifications and improvements

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch message-support
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit d31f4ff5c3f44debb66730788038178d7e1c39a9
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Wed May 22 16:12:16 2019 +0200

    Basic message consumer simplifications and improvements
---
 .../core5/reactive/ReactiveResponseConsumer.java   |  5 --
 .../apache/hc/core5/benchmark/BenchmarkWorker.java |  5 --
 .../core5/testing/nio/MessageExchangeHandler.java  |  5 ++
 .../hc/core5/http/nio/AsyncRequestConsumer.java    |  8 ---
 .../hc/core5/http/nio/AsyncResponseConsumer.java   |  8 ---
 .../hc/core5/http/nio/BasicRequestConsumer.java    | 47 ++++++++++-------
 .../hc/core5/http/nio/BasicResponseConsumer.java   | 43 ++++++++++-----
 .../support/AbstractAsyncRequesterConsumer.java    | 51 ++++++++++++------
 .../nio/support/AbstractAsyncResponseConsumer.java | 61 ++++++++++++----------
 9 files changed, 130 insertions(+), 103 deletions(-)

diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
index 4300e3d..980194a 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
@@ -142,11 +142,6 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
     }
 
     @Override
-    public Void getResult() {
-        return null;
-    }
-
-    @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
         reactiveDataConsumer.updateCapacity(capacityChannel);
     }
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
index a254653..ce10276 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
@@ -255,11 +255,6 @@ class BenchmarkWorker implements ResourceHolder {
             }
 
             @Override
-            public Void getResult() {
-                return null;
-            }
-
-            @Override
             public void failed(final Exception cause) {
                 stats.incFailureCount();
                 final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
index 0e7565e..8031412 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
@@ -26,6 +26,7 @@
  */
 package org.apache.hc.core5.testing.nio;
 
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
@@ -48,6 +49,10 @@ public abstract class MessageExchangeHandler<T> extends AbstractServerExchangeHa
         this.requestConsumer = requestConsumer;
     }
 
+    public MessageExchangeHandler(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+        this(new BasicRequestConsumer<>(dataConsumerSupplier));
+    }
+
     public MessageExchangeHandler(final AsyncEntityConsumer<T> entityConsumer) {
         this(new BasicRequestConsumer<>(entityConsumer));
     }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
index c4220f4..89f58a4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
@@ -63,12 +63,4 @@ public interface AsyncRequestConsumer<T> extends AsyncDataConsumer {
      */
     void failed(Exception cause);
 
-    /**
-     * Returns the result of request processing when it becomes available or {@code null}
-     * if the request is still being received.
-     *
-     * @return the request processing result.
-     */
-    T getResult();
-
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
index 9c14b3d..e5a0be6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
@@ -71,12 +71,4 @@ public interface AsyncResponseConsumer<T> extends AsyncDataConsumer {
      */
     void failed(Exception cause);
 
-    /**
-     * Returns the result of response processing when it becomes available or {@code null}
-     * if the response is still being received.
-     *
-     * @return the response processing result.
-     */
-    T getResult();
-
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
index 052b67d..7be827f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
@@ -29,8 +29,10 @@ package org.apache.hc.core5.http.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -38,7 +40,6 @@ import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.Message;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
 
 /**
  * Basic implementation of {@link AsyncRequestConsumer} that represents the request message as
@@ -48,12 +49,23 @@ import org.apache.hc.core5.util.Asserts;
  */
 public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<HttpRequest, T>> {
 
-    private final AsyncEntityConsumer<T> dataConsumer;
+    private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
 
-    private volatile Message<HttpRequest, T> result;
+    public BasicRequestConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
 
     public BasicRequestConsumer(final AsyncEntityConsumer<T> dataConsumer) {
-        this.dataConsumer = dataConsumer;
+        this(new Supplier<AsyncEntityConsumer<T>>() {
+
+            @Override
+            public AsyncEntityConsumer<T> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     @Override
@@ -64,16 +76,20 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
             final FutureCallback<Message<HttpRequest, T>> resultCallback) throws HttpException, IOException {
         Args.notNull(request, "Request");
         if (entityDetails != null) {
-            Asserts.notNull(dataConsumer, "Data consumer");
+            final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
+
             dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
 
                 @Override
                 public void completed(final T body) {
-                    result = new Message<>(request, body);
+                    final Message<HttpRequest, T> result = new Message<>(request, body);
                     if (resultCallback != null) {
                         resultCallback.completed(result);
                     }
-                    dataConsumer.releaseResources();
                 }
 
                 @Override
@@ -81,7 +97,6 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
                     if (resultCallback != null) {
                         resultCallback.failed(ex);
                     }
-                    dataConsumer.releaseResources();
                 }
 
                 @Override
@@ -89,34 +104,32 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
                     if (resultCallback != null) {
                         resultCallback.cancelled();
                     }
-                    dataConsumer.releaseResources();
                 }
 
             });
         } else {
-            result = new Message<>(request, null);
+            final Message<HttpRequest, T> result = new Message<>(request, null);
             if (resultCallback != null) {
                 resultCallback.completed(result);
             }
-            releaseResources();
         }
     }
 
     @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        Asserts.notNull(dataConsumer, "Data consumer");
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public void consume(final ByteBuffer src) throws IOException {
-        Asserts.notNull(dataConsumer, "Data consumer");
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.consume(src);
     }
 
     @Override
     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        Asserts.notNull(dataConsumer, "Data consumer");
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.streamEnd(trailers);
     }
 
@@ -126,12 +139,8 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
     }
 
     @Override
-    public Message<HttpRequest, T> getResult() {
-        return result;
-    }
-
-    @Override
     public void releaseResources() {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
         if (dataConsumer != null) {
             dataConsumer.releaseResources();
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
index 1aa31b8..2501985 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
@@ -29,8 +29,10 @@ package org.apache.hc.core5.http.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -48,12 +50,23 @@ import org.apache.hc.core5.util.Args;
  */
 public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<HttpResponse, T>> {
 
-    private final AsyncEntityConsumer<T> dataConsumer;
+    private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
 
-    private volatile Message<HttpResponse, T> result;
+    public BasicResponseConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
 
     public BasicResponseConsumer(final AsyncEntityConsumer<T> dataConsumer) {
-        this.dataConsumer = Args.notNull(dataConsumer, "Consumer");
+        this(new Supplier<AsyncEntityConsumer<T>>() {
+
+            @Override
+            public AsyncEntityConsumer<T> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     @Override
@@ -64,15 +77,19 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
         Args.notNull(response, "Response");
 
         if (entityDetails != null) {
+            final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
             dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
 
                 @Override
                 public void completed(final T body) {
-                    result = new Message<>(response, body);
+                    final Message<HttpResponse, T> result = new Message<>(response, body);
                     if (resultCallback != null) {
                         resultCallback.completed(result);
                     }
-                    dataConsumer.releaseResources();
                 }
 
                 @Override
@@ -91,11 +108,10 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
 
             });
         } else {
-            result = new Message<>(response, null);
+            final Message<HttpResponse, T> result = new Message<>(response, null);
             if (resultCallback != null) {
                 resultCallback.completed(result);
             }
-            dataConsumer.releaseResources();
         }
     }
 
@@ -105,16 +121,19 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
 
     @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public void consume(final ByteBuffer src) throws IOException {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.consume(src);
     }
 
     @Override
     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.streamEnd(trailers);
     }
 
@@ -124,13 +143,11 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
     }
 
     @Override
-    public Message<HttpResponse, T> getResult() {
-        return result;
-    }
-
-    @Override
     public void releaseResources() {
-        dataConsumer.releaseResources();
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
+        if (dataConsumer != null) {
+            dataConsumer.releaseResources();
+        }
     }
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
index 3864a08..da6b861 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
@@ -30,8 +30,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.UnsupportedCharsetException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
@@ -54,13 +56,23 @@ import org.apache.hc.core5.util.Args;
  */
 public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncRequestConsumer<T> {
 
-    private final AsyncEntityConsumer<E> entityConsumer;
+    private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
 
-    private volatile T result;
+    public AbstractAsyncRequesterConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
+
+    public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> dataConsumer) {
+        this(new Supplier<AsyncEntityConsumer<E>>() {
 
-    public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> entityConsumer) {
-        Args.notNull(entityConsumer, "Entity consumer");
-        this.entityConsumer = entityConsumer;
+            @Override
+            public AsyncEntityConsumer<E> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     /**
@@ -78,14 +90,19 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
             final EntityDetails entityDetails,
             final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
         if (entityDetails != null) {
-            entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
+            final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
+            dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
 
                 @Override
                 public void completed(final E entity) {
                     final ContentType contentType;
                     try {
                         contentType = ContentType.parse(entityDetails.getContentType());
-                        result = buildResult(request, entity, contentType);
+                        final T result = buildResult(request, entity, contentType);
                         resultCallback.completed(result);
                     } catch (final UnsupportedCharsetException ex) {
                         resultCallback.failed(ex);
@@ -105,29 +122,26 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
             });
         } else {
             resultCallback.completed(buildResult(request, null, null));
-            entityConsumer.releaseResources();
         }
 
     }
 
     @Override
     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        entityConsumer.updateCapacity(capacityChannel);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public final void consume(final ByteBuffer src) throws IOException {
-        entityConsumer.consume(src);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.consume(src);
     }
 
     @Override
     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        entityConsumer.streamEnd(trailers);
-    }
-
-    @Override
-    public T getResult() {
-        return result;
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.streamEnd(trailers);
     }
 
     @Override
@@ -137,7 +151,10 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
 
     @Override
     public final void releaseResources() {
-        entityConsumer.releaseResources();
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
+        if (dataConsumer != null) {
+            dataConsumer.releaseResources();
+        }
     }
 
 }
\ No newline at end of file
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
index 0472f8f..9012ee2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
@@ -55,15 +56,23 @@ import org.apache.hc.core5.util.Args;
  */
 public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
 
-    private final AsyncEntityConsumer<E> entityConsumer;
-    private final AtomicReference<T> resultRef;
-    private final AtomicReference<Exception> exceptionRef;
+    private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
 
-    public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> entityConsumer) {
-        Args.notNull(entityConsumer, "Entity consumer");
-        this.entityConsumer = entityConsumer;
-        this.resultRef = new AtomicReference<>(null);
-        this.exceptionRef = new AtomicReference<>(null);
+    public AbstractAsyncResponseConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
+
+    public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> dataConsumer) {
+        this(new Supplier<AsyncEntityConsumer<E>>() {
+
+            @Override
+            public AsyncEntityConsumer<E> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     /**
@@ -81,7 +90,12 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
             final EntityDetails entityDetails,
             final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
         if (entityDetails != null) {
-            entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
+            final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
+            dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
 
                 @Override
                 public void completed(final E entity) {
@@ -89,12 +103,10 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
                     try {
                         contentType = ContentType.parse(entityDetails.getContentType());
                         final T result = buildResult(response, entity, contentType);
-                        resultRef.compareAndSet(null, result);
                         if (resultCallback != null) {
                             resultCallback.completed(result);
                         }
                     } catch (final UnsupportedCharsetException ex) {
-                        exceptionRef.compareAndSet(null, ex);
                         if (resultCallback != null) {
                             resultCallback.failed(ex);
                         }
@@ -103,7 +115,6 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
 
                 @Override
                 public void failed(final Exception ex) {
-                    exceptionRef.compareAndSet(null, ex);
                     if (resultCallback != null) {
                         resultCallback.failed(ex);
                     }
@@ -119,48 +130,42 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
             });
         } else {
             final T result = buildResult(response, null, null);
-            resultRef.compareAndSet(null, result);
             if (resultCallback != null) {
                 resultCallback.completed(result);
             }
-            entityConsumer.releaseResources();
         }
 
     }
 
     @Override
     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        entityConsumer.updateCapacity(capacityChannel);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public final void consume(final ByteBuffer src) throws IOException {
-        entityConsumer.consume(src);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.consume(src);
     }
 
     @Override
     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        entityConsumer.streamEnd(trailers);
-    }
-
-    @Override
-    public T getResult() {
-        return resultRef.get();
-    }
-
-    public Exception getException() {
-        return exceptionRef.get();
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.streamEnd(trailers);
     }
 
     @Override
     public final void failed(final Exception cause) {
-        exceptionRef.compareAndSet(null, cause);
         releaseResources();
     }
 
     @Override
     public final void releaseResources() {
-        entityConsumer.releaseResources();
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
+        if (dataConsumer != null) {
+            dataConsumer.releaseResources();
+        }
     }
 
 }
\ No newline at end of file