You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/05/23 10:54:17 UTC

[httpcomponents-core] branch message-support created (now d31f4ff)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch message-support
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git.


      at d31f4ff  Basic message consumer simplifications and improvements

This branch includes the following new commits:

     new d31f4ff  Basic message consumer simplifications and improvements

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[httpcomponents-core] 01/01: Basic message consumer simplifications and improvements

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch message-support
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit d31f4ff5c3f44debb66730788038178d7e1c39a9
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Wed May 22 16:12:16 2019 +0200

    Basic message consumer simplifications and improvements
---
 .../core5/reactive/ReactiveResponseConsumer.java   |  5 --
 .../apache/hc/core5/benchmark/BenchmarkWorker.java |  5 --
 .../core5/testing/nio/MessageExchangeHandler.java  |  5 ++
 .../hc/core5/http/nio/AsyncRequestConsumer.java    |  8 ---
 .../hc/core5/http/nio/AsyncResponseConsumer.java   |  8 ---
 .../hc/core5/http/nio/BasicRequestConsumer.java    | 47 ++++++++++-------
 .../hc/core5/http/nio/BasicResponseConsumer.java   | 43 ++++++++++-----
 .../support/AbstractAsyncRequesterConsumer.java    | 51 ++++++++++++------
 .../nio/support/AbstractAsyncResponseConsumer.java | 61 ++++++++++++----------
 9 files changed, 130 insertions(+), 103 deletions(-)

diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
index 4300e3d..980194a 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
@@ -142,11 +142,6 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
     }
 
     @Override
-    public Void getResult() {
-        return null;
-    }
-
-    @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
         reactiveDataConsumer.updateCapacity(capacityChannel);
     }
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
index a254653..ce10276 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/BenchmarkWorker.java
@@ -255,11 +255,6 @@ class BenchmarkWorker implements ResourceHolder {
             }
 
             @Override
-            public Void getResult() {
-                return null;
-            }
-
-            @Override
             public void failed(final Exception cause) {
                 stats.incFailureCount();
                 final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
index 0e7565e..8031412 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MessageExchangeHandler.java
@@ -26,6 +26,7 @@
  */
 package org.apache.hc.core5.testing.nio;
 
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
@@ -48,6 +49,10 @@ public abstract class MessageExchangeHandler<T> extends AbstractServerExchangeHa
         this.requestConsumer = requestConsumer;
     }
 
+    public MessageExchangeHandler(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+        this(new BasicRequestConsumer<>(dataConsumerSupplier));
+    }
+
     public MessageExchangeHandler(final AsyncEntityConsumer<T> entityConsumer) {
         this(new BasicRequestConsumer<>(entityConsumer));
     }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
index c4220f4..89f58a4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestConsumer.java
@@ -63,12 +63,4 @@ public interface AsyncRequestConsumer<T> extends AsyncDataConsumer {
      */
     void failed(Exception cause);
 
-    /**
-     * Returns the result of request processing when it becomes available or {@code null}
-     * if the request is still being received.
-     *
-     * @return the request processing result.
-     */
-    T getResult();
-
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
index 9c14b3d..e5a0be6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseConsumer.java
@@ -71,12 +71,4 @@ public interface AsyncResponseConsumer<T> extends AsyncDataConsumer {
      */
     void failed(Exception cause);
 
-    /**
-     * Returns the result of response processing when it becomes available or {@code null}
-     * if the response is still being received.
-     *
-     * @return the response processing result.
-     */
-    T getResult();
-
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
index 052b67d..7be827f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestConsumer.java
@@ -29,8 +29,10 @@ package org.apache.hc.core5.http.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -38,7 +40,6 @@ import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.Message;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
 
 /**
  * Basic implementation of {@link AsyncRequestConsumer} that represents the request message as
@@ -48,12 +49,23 @@ import org.apache.hc.core5.util.Asserts;
  */
 public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<HttpRequest, T>> {
 
-    private final AsyncEntityConsumer<T> dataConsumer;
+    private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
 
-    private volatile Message<HttpRequest, T> result;
+    public BasicRequestConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
 
     public BasicRequestConsumer(final AsyncEntityConsumer<T> dataConsumer) {
-        this.dataConsumer = dataConsumer;
+        this(new Supplier<AsyncEntityConsumer<T>>() {
+
+            @Override
+            public AsyncEntityConsumer<T> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     @Override
@@ -64,16 +76,20 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
             final FutureCallback<Message<HttpRequest, T>> resultCallback) throws HttpException, IOException {
         Args.notNull(request, "Request");
         if (entityDetails != null) {
-            Asserts.notNull(dataConsumer, "Data consumer");
+            final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
+
             dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
 
                 @Override
                 public void completed(final T body) {
-                    result = new Message<>(request, body);
+                    final Message<HttpRequest, T> result = new Message<>(request, body);
                     if (resultCallback != null) {
                         resultCallback.completed(result);
                     }
-                    dataConsumer.releaseResources();
                 }
 
                 @Override
@@ -81,7 +97,6 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
                     if (resultCallback != null) {
                         resultCallback.failed(ex);
                     }
-                    dataConsumer.releaseResources();
                 }
 
                 @Override
@@ -89,34 +104,32 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
                     if (resultCallback != null) {
                         resultCallback.cancelled();
                     }
-                    dataConsumer.releaseResources();
                 }
 
             });
         } else {
-            result = new Message<>(request, null);
+            final Message<HttpRequest, T> result = new Message<>(request, null);
             if (resultCallback != null) {
                 resultCallback.completed(result);
             }
-            releaseResources();
         }
     }
 
     @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        Asserts.notNull(dataConsumer, "Data consumer");
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public void consume(final ByteBuffer src) throws IOException {
-        Asserts.notNull(dataConsumer, "Data consumer");
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.consume(src);
     }
 
     @Override
     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        Asserts.notNull(dataConsumer, "Data consumer");
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.streamEnd(trailers);
     }
 
@@ -126,12 +139,8 @@ public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<Htt
     }
 
     @Override
-    public Message<HttpRequest, T> getResult() {
-        return result;
-    }
-
-    @Override
     public void releaseResources() {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
         if (dataConsumer != null) {
             dataConsumer.releaseResources();
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
index 1aa31b8..2501985 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseConsumer.java
@@ -29,8 +29,10 @@ package org.apache.hc.core5.http.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -48,12 +50,23 @@ import org.apache.hc.core5.util.Args;
  */
 public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<HttpResponse, T>> {
 
-    private final AsyncEntityConsumer<T> dataConsumer;
+    private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
 
-    private volatile Message<HttpResponse, T> result;
+    public BasicResponseConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
 
     public BasicResponseConsumer(final AsyncEntityConsumer<T> dataConsumer) {
-        this.dataConsumer = Args.notNull(dataConsumer, "Consumer");
+        this(new Supplier<AsyncEntityConsumer<T>>() {
+
+            @Override
+            public AsyncEntityConsumer<T> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     @Override
@@ -64,15 +77,19 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
         Args.notNull(response, "Response");
 
         if (entityDetails != null) {
+            final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
             dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
 
                 @Override
                 public void completed(final T body) {
-                    result = new Message<>(response, body);
+                    final Message<HttpResponse, T> result = new Message<>(response, body);
                     if (resultCallback != null) {
                         resultCallback.completed(result);
                     }
-                    dataConsumer.releaseResources();
                 }
 
                 @Override
@@ -91,11 +108,10 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
 
             });
         } else {
-            result = new Message<>(response, null);
+            final Message<HttpResponse, T> result = new Message<>(response, null);
             if (resultCallback != null) {
                 resultCallback.completed(result);
             }
-            dataConsumer.releaseResources();
         }
     }
 
@@ -105,16 +121,19 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
 
     @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public void consume(final ByteBuffer src) throws IOException {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.consume(src);
     }
 
     @Override
     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
         dataConsumer.streamEnd(trailers);
     }
 
@@ -124,13 +143,11 @@ public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<H
     }
 
     @Override
-    public Message<HttpResponse, T> getResult() {
-        return result;
-    }
-
-    @Override
     public void releaseResources() {
-        dataConsumer.releaseResources();
+        final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
+        if (dataConsumer != null) {
+            dataConsumer.releaseResources();
+        }
     }
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
index 3864a08..da6b861 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java
@@ -30,8 +30,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.UnsupportedCharsetException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
@@ -54,13 +56,23 @@ import org.apache.hc.core5.util.Args;
  */
 public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncRequestConsumer<T> {
 
-    private final AsyncEntityConsumer<E> entityConsumer;
+    private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
 
-    private volatile T result;
+    public AbstractAsyncRequesterConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
+
+    public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> dataConsumer) {
+        this(new Supplier<AsyncEntityConsumer<E>>() {
 
-    public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> entityConsumer) {
-        Args.notNull(entityConsumer, "Entity consumer");
-        this.entityConsumer = entityConsumer;
+            @Override
+            public AsyncEntityConsumer<E> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     /**
@@ -78,14 +90,19 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
             final EntityDetails entityDetails,
             final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
         if (entityDetails != null) {
-            entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
+            final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
+            dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
 
                 @Override
                 public void completed(final E entity) {
                     final ContentType contentType;
                     try {
                         contentType = ContentType.parse(entityDetails.getContentType());
-                        result = buildResult(request, entity, contentType);
+                        final T result = buildResult(request, entity, contentType);
                         resultCallback.completed(result);
                     } catch (final UnsupportedCharsetException ex) {
                         resultCallback.failed(ex);
@@ -105,29 +122,26 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
             });
         } else {
             resultCallback.completed(buildResult(request, null, null));
-            entityConsumer.releaseResources();
         }
 
     }
 
     @Override
     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        entityConsumer.updateCapacity(capacityChannel);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public final void consume(final ByteBuffer src) throws IOException {
-        entityConsumer.consume(src);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.consume(src);
     }
 
     @Override
     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        entityConsumer.streamEnd(trailers);
-    }
-
-    @Override
-    public T getResult() {
-        return result;
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.streamEnd(trailers);
     }
 
     @Override
@@ -137,7 +151,10 @@ public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncReque
 
     @Override
     public final void releaseResources() {
-        entityConsumer.releaseResources();
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
+        if (dataConsumer != null) {
+            dataConsumer.releaseResources();
+        }
     }
 
 }
\ No newline at end of file
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
index 0472f8f..9012ee2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
@@ -55,15 +56,23 @@ import org.apache.hc.core5.util.Args;
  */
 public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
 
-    private final AsyncEntityConsumer<E> entityConsumer;
-    private final AtomicReference<T> resultRef;
-    private final AtomicReference<Exception> exceptionRef;
+    private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
+    private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
 
-    public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> entityConsumer) {
-        Args.notNull(entityConsumer, "Entity consumer");
-        this.entityConsumer = entityConsumer;
-        this.resultRef = new AtomicReference<>(null);
-        this.exceptionRef = new AtomicReference<>(null);
+    public AbstractAsyncResponseConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
+        this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
+        this.dataConsumerRef = new AtomicReference<>(null);
+    }
+
+    public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> dataConsumer) {
+        this(new Supplier<AsyncEntityConsumer<E>>() {
+
+            @Override
+            public AsyncEntityConsumer<E> get() {
+                return dataConsumer;
+            }
+
+        });
     }
 
     /**
@@ -81,7 +90,12 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
             final EntityDetails entityDetails,
             final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
         if (entityDetails != null) {
-            entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
+            final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
+            if (dataConsumer == null) {
+                throw new HttpException("Supplied data consumer is null");
+            }
+            dataConsumerRef.set(dataConsumer);
+            dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
 
                 @Override
                 public void completed(final E entity) {
@@ -89,12 +103,10 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
                     try {
                         contentType = ContentType.parse(entityDetails.getContentType());
                         final T result = buildResult(response, entity, contentType);
-                        resultRef.compareAndSet(null, result);
                         if (resultCallback != null) {
                             resultCallback.completed(result);
                         }
                     } catch (final UnsupportedCharsetException ex) {
-                        exceptionRef.compareAndSet(null, ex);
                         if (resultCallback != null) {
                             resultCallback.failed(ex);
                         }
@@ -103,7 +115,6 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
 
                 @Override
                 public void failed(final Exception ex) {
-                    exceptionRef.compareAndSet(null, ex);
                     if (resultCallback != null) {
                         resultCallback.failed(ex);
                     }
@@ -119,48 +130,42 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
             });
         } else {
             final T result = buildResult(response, null, null);
-            resultRef.compareAndSet(null, result);
             if (resultCallback != null) {
                 resultCallback.completed(result);
             }
-            entityConsumer.releaseResources();
         }
 
     }
 
     @Override
     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
-        entityConsumer.updateCapacity(capacityChannel);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.updateCapacity(capacityChannel);
     }
 
     @Override
     public final void consume(final ByteBuffer src) throws IOException {
-        entityConsumer.consume(src);
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.consume(src);
     }
 
     @Override
     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
-        entityConsumer.streamEnd(trailers);
-    }
-
-    @Override
-    public T getResult() {
-        return resultRef.get();
-    }
-
-    public Exception getException() {
-        return exceptionRef.get();
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
+        dataConsumer.streamEnd(trailers);
     }
 
     @Override
     public final void failed(final Exception cause) {
-        exceptionRef.compareAndSet(null, cause);
         releaseResources();
     }
 
     @Override
     public final void releaseResources() {
-        entityConsumer.releaseResources();
+        final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
+        if (dataConsumer != null) {
+            dataConsumer.releaseResources();
+        }
     }
 
 }
\ No newline at end of file