You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2019/07/31 09:21:00 UTC
[jira] [Resolved] (CAMEL-9370) Support for async/deferred content
[ https://issues.apache.org/jira/browse/CAMEL-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen resolved CAMEL-9370.
--------------------------------
Resolution: Abandoned
> Support for async/deferred content
> ----------------------------------
>
> Key: CAMEL-9370
> URL: https://issues.apache.org/jira/browse/CAMEL-9370
> Project: Camel
> Issue Type: Improvement
> Components: camel-jetty
> Affects Versions: 2.16.1
> Reporter: Tuomas Kiviaho
> Priority: Major
>
> I'm receiving {{text/event-stream}} and to my surprise there was no support of anykind for {{AsyncContentProvider}}.
> Hence here are my dealings with the issue in a form of a patch. Some of the bits would work out of the box as-is, but others (such as {{jettyBinding instanceof Response.ResponseListener}}). I've rammed in just to get it working. It would be nice if this type of streaming could be enabled without having to declare {{jettyBinding}}. A new option for instance would be more practical solution.
> {code:title=org/apache/camel/component/jetty/DefaultJettyHttpBinding.java}
> @@ -191,7 +191,8 @@
> }
> } else {
> // just grab the raw content body
> - return httpExchange.getBody();
> + byte[] body = httpExchange.getBody();
> + return body == null ? httpExchange.getResponseContentProvider() : body;
> }
> }
>
> {code}
> {code:title=org/apache/camel/component/jetty/JettyContentExchange.java}
> @@ -25,6 +25,7 @@
> import org.apache.camel.AsyncCallback;
> import org.apache.camel.Exchange;
> import org.eclipse.jetty.client.HttpClient;
> +import org.eclipse.jetty.client.api.ContentProvider;
>
> public interface JettyContentExchange {
>
> @@ -44,6 +45,8 @@
> void setRequestContent(String data, String charset) throws UnsupportedEncodingException;
>
> void setRequestContent(InputStream ins);
> +
> + void setRequestContent(ContentProvider contentProvider);
>
> void addRequestHeader(String key, String s);
>
> @@ -63,6 +66,8 @@
> int getResponseStatus();
>
> byte[] getResponseContentBytes();
> +
> + ContentProvider getResponseContentProvider();
>
> Map<String, Collection<String>> getResponseHeaders();
>
> {code}
> {code:title=org/apache/camel/component/jetty/JettyHttpProducer.java}
> @@ -32,7 +32,6 @@
> import org.apache.camel.Message;
> import org.apache.camel.http.common.HttpConstants;
> import org.apache.camel.http.common.HttpHelper;
> -import org.apache.camel.http.common.HttpMethods;
> import org.apache.camel.impl.DefaultAsyncProducer;
> import org.apache.camel.spi.HeaderFilterStrategy;
> import org.apache.camel.util.ExchangeHelper;
> @@ -40,6 +39,7 @@
> import org.apache.camel.util.ObjectHelper;
> import org.apache.camel.util.URISupport;
> import org.eclipse.jetty.client.HttpClient;
> +import org.eclipse.jetty.client.api.ContentProvider;
> import org.eclipse.jetty.util.component.LifeCycle;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> @@ -160,6 +160,9 @@
> // (for example application/x-www-form-urlencoded forms being sent)
> String charset = IOHelper.getCharsetName(exchange, false);
> httpExchange.setRequestContent(data, charset);
> + } else if (body instanceof ContentProvider) {
> + ContentProvider contentProvider = (ContentProvider) body;
> + httpExchange.setRequestContent(contentProvider);
> } else {
> // then fallback to input stream
> InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, exchange.getIn().getBody());
> {code}
> {code:title=org/apache/camel/component/jetty9/JettyContentExchange9.java}
> @@ -21,6 +21,7 @@
> import java.io.InputStream;
> import java.io.UnsupportedEncodingException;
> import java.net.MalformedURLException;
> +import java.nio.ByteBuffer;
> import java.util.Collection;
> import java.util.Map;
> import java.util.TreeMap;
> @@ -35,14 +36,18 @@
> import org.apache.camel.component.jetty.JettyContentExchange;
> import org.apache.camel.component.jetty.JettyHttpBinding;
> import org.eclipse.jetty.client.HttpClient;
> +import org.eclipse.jetty.client.Synchronizable;
> +import org.eclipse.jetty.client.api.ContentProvider;
> import org.eclipse.jetty.client.api.Request;
> import org.eclipse.jetty.client.api.Response;
> import org.eclipse.jetty.client.api.Result;
> import org.eclipse.jetty.client.util.BufferingResponseListener;
> import org.eclipse.jetty.client.util.BytesContentProvider;
> +import org.eclipse.jetty.client.util.DeferredContentProvider;
> import org.eclipse.jetty.client.util.InputStreamContentProvider;
> import org.eclipse.jetty.client.util.StringContentProvider;
> import org.eclipse.jetty.http.HttpFields;
> +import org.eclipse.jetty.util.Callback;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> @@ -60,7 +65,7 @@
> private final CountDownLatch done = new CountDownLatch(1);
> private Request request;
> private Response response;
> - private byte[] responseContent;
> + private Object responseContent;
>
> private String requestContentType;
>
> @@ -183,16 +188,20 @@
> }
>
> public void setRequestContent(byte[] byteArray) {
> - this.request.content(new BytesContentProvider(byteArray), this.requestContentType);
> + this.setRequestContent(new BytesContentProvider(byteArray));
> }
>
> public void setRequestContent(String data, String charset) throws UnsupportedEncodingException {
> StringContentProvider cp = charset != null ? new StringContentProvider(data, charset) : new StringContentProvider(data);
> - this.request.content(cp, this.requestContentType);
> + this.setRequestContent(cp);
> }
>
> public void setRequestContent(InputStream ins) {
> - this.request.content(new InputStreamContentProvider(ins), this.requestContentType);
> + this.setRequestContent(new InputStreamContentProvider(ins));
> + }
> +
> + public void setRequestContent(ContentProvider contentProvider) {
> + this.request.content(contentProvider, this.requestContentType);
> }
>
> public void addRequestHeader(String key, String s) {
> @@ -213,7 +222,66 @@
> }
>
> };
> - BufferingResponseListener responseListener = new BufferingResponseListener() {
> + Response.CompleteListener responseListener = jettyBinding instanceof Response.ResponseListener ? new Response.Listener.Adapter() {
> +
> + @Override
> + public void onHeaders(Response response) {
> + LOG.trace("onResponseComplete");
> + done.countDown();
> + JettyContentExchange9.this.response = response;
> + JettyContentExchange9.this.responseContent = new DeferredContentProvider() {
> +
> + @Override
> + public long getLength()
> + {
> + return -1;
> + }
> +
> + };
> + try {
> + jettyBinding.populateResponse(exchange, JettyContentExchange9.this);
> + } catch (Exception e) {
> + exchange.setException(e);
> + } finally {
> + JettyContentExchange9.this.callback.done(false);
> + }
> + }
> +
> + @Override
> + public void onContent(Response response, ByteBuffer content, Callback callback) {
> + DeferredContentProvider deferredContentProvider = (DeferredContentProvider) JettyContentExchange9.this.responseContent;
> + if (!deferredContentProvider.offer(content, callback)) {
> + Synchronizable synchronizable = (Synchronizable) JettyContentExchange9.this.responseContent;
> + Object lock = synchronizable.getLock();
> + synchronized (lock) {
> + if (!deferredContentProvider.offer(content, callback)) {
> + try {
> + lock.wait();
> + } catch (InterruptedException e) {
> + callback.failed(e);
> + }
> + }
> + }
> + }
> + }
> +
> + @Override
> + public void onFailure(Response response, Throwable failure)
> + {
> + doTaskCompleted(failure);
> + }
> +
> + @Override
> + public void onComplete(Result result) {
> + DeferredContentProvider contentProvider = (DeferredContentProvider) JettyContentExchange9.this.responseContent;
> + if (result.isSucceeded()) {
> + contentProvider.close();
> + } else {
> + contentProvider.failed(result.getFailure());
> + }
> + }
> +
> + } : new BufferingResponseListener() {
>
> @Override
> public void onComplete(Result result) {
> @@ -232,7 +300,11 @@
> }
>
> public byte[] getResponseContentBytes() {
> - return responseContent;
> + return (byte[]) responseContent;
> + }
> +
> + public ContentProvider getResponseContentProvider() {
> + return (ContentProvider) responseContent;
> }
>
> private Map<String, Collection<String>> getFieldsAsMap(HttpFields fields) {
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)