You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hc.apache.org by rschmitt <gi...@git.apache.org> on 2018/08/21 04:49:50 UTC

[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

GitHub user rschmitt opened a pull request:

    https://github.com/apache/httpcomponents-core/pull/74

    HTTPCLIENT-1942: Add support for Reactive Streams

    This contribution adds initial support for the [Reactive Streams specification](http://www.reactive-streams.org/). The main part of this change is a pair of non-public classes, the `ReactiveDataProducer` and `ReactiveDataConsumer`, which are reactive adapters for Apache's AsyncDataProducer and AsyncDataConsumer interfaces. Two public classes are built on top of these types:
    
    * `ReactiveEntityProducer`: An `AsyncEntityProducer` implementation backed by `ReactiveDataProducer` that allows a `Publisher<ByteBuffer>` to be streamed as a request body
    * `ReactiveResponseConsumer`: An `AsyncResponseConsumer` that exposes the overall HTTP response as a `Message<HttpResponse, Publisher<ByteBuffer>>`, allowing the body to be consumed by a `Subscriber<ByteBuffer>`
    
    These classes are enough to allow the async Apache client to straightforwardly integrate with reactive frameworks like [RxJava](https://github.com/ReactiveX/RxJava), [Akka](https://akka.io/), and [Vert.x](https://vertx.io/).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rschmitt/httpcomponents-core reactive

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/httpcomponents-core/pull/74.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #74
    
----
commit 1c4713f40e0175de5c0e3053abc2f1e11a597d57
Author: Ryan Schmitt <ry...@...>
Date:   2018-08-17T21:16:04Z

    HTTPCLIENT-1942: Add support for Reactive Streams
    
    This commit adds initial support for the Reactive Streams specification
    [1]. The main part of this change is a pair of non-public classes, the
    ReactiveDataProducer and ReactiveDataConsumer, which are reactive
    adapters for Apache's AsyncDataProducer and AsyncDataConsumer
    interfaces. Two public classes are built on top of these types:
    
    * ReactiveEntityProducer: An AsyncEntityProducer implementation backed
      by ReactiveDataProducer that allows a Publisher<ByteBuffer> to be
      streamed as a request body
    * ReactiveResponseConsumer: An AsyncResponseConsumer that exposes the
      overall HTTP response as a Message<HttpResponse, Publisher<ByteBuffer>>,
      allowing the body to be consumed by a Subscriber<ByteBuffer>
    
    [1] http://www.reactive-streams.org/

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by ok2c <gi...@git.apache.org>.
Github user ok2c commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212403300
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM (without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    @rschmitt By the way, why not using a subclass of `H2StreamResetException` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212421284
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.concurrent.BasicFuture;
    +import org.apache.hc.core5.concurrent.FutureCallback;
    +import org.apache.hc.core5.http.EntityDetails;
    +import org.apache.hc.core5.http.Header;
    +import org.apache.hc.core5.http.HttpResponse;
    +import org.apache.hc.core5.http.Message;
    +import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
    +import org.apache.hc.core5.http.nio.CapacityChannel;
    +import org.apache.hc.core5.http.protocol.HttpContext;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.Future;
    +
    +/**
    + * An {@link AsyncResponseConsumer} that publishes the response body through
    + * a {@link Publisher}, as defined by the Reactive Streams specification. The
    + * response is represented as a {@link Message} consisting of a {@link
    + * HttpResponse} representing the headers and a {@link Publisher} representing
    + * the response body as an asynchronous stream of {@link ByteBuffer} instances.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Void> {
    +
    +    private final ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
    +    private final List<Header> trailers = Collections.synchronizedList(new ArrayList<Header>());
    +    private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
    +
    +    private volatile BasicFuture<Void> responseCompletion;
    +    private volatile HttpResponse informationResponse;
    +    private volatile EntityDetails entityDetails;
    +
    +    /**
    +     * Creates a {@code ReactiveResponseConsumer}.
    +     */
    +    public ReactiveResponseConsumer() {
    +        this.responseFuture = new BasicFuture<>(null);
    +    }
    +
    +    /**
    +     * Creates a {@code ReactiveResponseConsumer} that will call back the supplied {@link FutureCallback} with a
    +     * streamable response.
    +     *
    +     * @param responseCallback the callback to invoke when the response is available for consumption.
    +     */
    +    public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
    +        this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
    +    }
    +
    +    public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
    +        return responseFuture;
    +    }
    +
    +    /**
    +     * Returns the intermediate (1xx) HTTP response if one was received.
    +     *
    +     * @return the information response, or {@code null} if none.
    +     */
    +    public HttpResponse getInformationResponse() {
    +        return informationResponse;
    +    }
    +
    +    /**
    +     * Returns the response entity details.
    +     *
    +     * @return the entity details, or {@code null} if none.
    +     */
    +    public EntityDetails getEntityDetails() {
    +        return entityDetails;
    +    }
    +
    +    /**
    +     * Returns the trailers received at the end of the response.
    +     *
    +     * @return a non-null list of zero or more trailers.
    +     */
    +    public List<Header> getTrailers() {
    +        return trailers;
    +    }
    +
    +    @Override
    +    public void consumeResponse(
    +        final HttpResponse response,
    +        final EntityDetails entityDetails,
    +        final HttpContext httpContext,
    +        final FutureCallback<Void> resultCallback
    +    ) {
    +        this.entityDetails = entityDetails;
    +        this.responseCompletion = new BasicFuture<>(resultCallback);
    +        this.responseFuture.completed(new Message<HttpResponse, Publisher<ByteBuffer>>(response, reactiveDataConsumer));
    --- End diff --
    
    I'm not sure what you mean by `Consumer`, because I thought this entire project was targeting JDK7. (RxJava provides a `Consumer<T>` interface, but the Reactive Streams API does not.) Either way, I would argue that we *do* need to return a `Future`, for the same reason that the `#execute` method returns a `Future`: callers need a single thing that they can block on (or decorate with callback logic) to wait for success, failure, or cancellation.
    
    In the integration tests, there is a fairly consistent idiom:
    
    ```java
            final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
            requester.execute(request, consumer, Timeout.ofSeconds(2), null);
            final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
    ```
    
    With a `Consumer`-based API, a caller might write:
    
    ```java
            final BlockingQueue<Message<HttpResponse, Publisher<ByteBuffer>>> streamingResponse =
                new ArrayBlockingQueue<>(1);
            final ReactiveResponseConsumer consumer =
                new ReactiveResponseConsumer(new Consumer<Message<HttpResponse, Publisher<ByteBuffer>>>() {
                    @Override
                    public void accept(final Message<HttpResponse, Publisher<ByteBuffer>> httpResponsePublisherMessage) {
                        streamingResponse.offer(httpResponsePublisherMessage);
                    }
                });
            requester.execute(request, consumer, Timeout.ofSeconds(2), null);
            final Message<HttpResponse, Publisher<ByteBuffer>> response = streamingResponse.take();
    ```
    
    However, the above code will block forever if an exception occurs and the response fails to come back. One way to fix this is by setting up a `CompletableFuture` or `BasicFuture` and then completing it from *both* callbacks:
    
    ```java
            final CompletableFuture<Message<HttpResponse, Publisher<ByteBuffer>>> streamingResponse =
                new CompletableFuture<>();
            final ReactiveResponseConsumer consumer =
                new ReactiveResponseConsumer(new Consumer<Message<HttpResponse, Publisher<ByteBuffer>>>() {
                    @Override
                    public void accept(final Message<HttpResponse, Publisher<ByteBuffer>> httpResponsePublisherMessage) {
                        streamingResponse.complete(httpResponsePublisherMessage);
                    }
                });
            requester.execute(request, consumer, Timeout.ofSeconds(2), new FutureCallback<Void>() {
                @Override
                public void completed(Void result) {
                    // TODO: Can this race with the other callback?
                    streamingResponse.completeExceptionally(new RuntimeException("No stream returned"));
                }
    
                @Override
                public void failed(Exception ex) {
                    streamingResponse.completeExceptionally(ex);
                }
    
                @Override
                public void cancelled() {
                    streamingResponse.completeExceptionally(new CancellationException());
                }
            });
            final Message<HttpResponse, Publisher<ByteBuffer>> response = streamingResponse.join();
    ```
    
    In short, constructing a correctly functioning `Future` for the streaming response is definitely something I want to do on the caller's behalf, rather than asking the caller to construct one manually from lower-level primitives.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r213104118
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM (without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    `H2StreamResetException` works perfectly, except for the additional dependency edge from `:httpcore5-reactive` to `:httpcore5-h2`, which I think is acceptable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212414367
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM (without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    That's an interesting option, but what happens if I throw it on HTTP/1.1?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by ok2c <gi...@git.apache.org>.
Github user ok2c commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212418676
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM (without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    @rschmitt Nothing spectacular. I'll likely be handled like any other I/O error. I am still open to using a special annotation for exceptions, though, if you like that approach better.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core issue #74: HTTPCLIENT-1942: Add support for Reactive Str...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt commented on the issue:

    https://github.com/apache/httpcomponents-core/pull/74
  
    Thanks for taking the time to review this contribution!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212526012
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM (without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    I'll experiment with `H2StreamResetException`. The suggestion to use annotations is creative, but it may be overkill for what I need, which is probably not much.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212413900
  
    --- Diff: httpcore5-reactive/pom.xml ---
    @@ -0,0 +1,87 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  ====================================================================
    +  Licensed to the Apache Software Foundation (ASF) under one
    +  or more contributor license agreements.  See the NOTICE file
    +  distributed with this work for additional information
    +  regarding copyright ownership.  The ASF licenses this file
    +  to you under the Apache License, Version 2.0 (the
    +  "License"); you may not use this file except in compliance
    +  with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +  Unless required by applicable law or agreed to in writing,
    +  software distributed under the License is distributed on an
    +  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +  KIND, either express or implied.  See the License for the
    +  specific language governing permissions and limitations
    +  under the License.
    +  ====================================================================
    +
    +  This software consists of voluntary contributions made by many
    +  individuals on behalf of the Apache Software Foundation.  For more
    +  information on the Apache Software Foundation, please see
    +  <http://www.apache.org />.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <parent>
    +    <artifactId>httpcore5-parent</artifactId>
    +    <groupId>org.apache.httpcomponents.core5</groupId>
    +    <version>5.0-beta3-SNAPSHOT</version>
    +  </parent>
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <artifactId>httpcore5-reactive</artifactId>
    +  <name>Apache HttpComponents Core Reactive Extensions</name>
    +  <description>Apache HttpComponents Reactive Streams Bindings</description>
    +  <url>http://hc.apache.org/httpcomponents-core-ga</url>
    +  <packaging>jar</packaging>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>org.apache.httpcomponents.core5</groupId>
    +      <artifactId>httpcore5</artifactId>
    +      <version>${project.version}</version>
    +      <scope>compile</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.reactivestreams</groupId>
    +      <artifactId>reactive-streams</artifactId>
    +      <version>1.0.2</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>junit</groupId>
    +      <artifactId>junit</artifactId>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>io.reactivex.rxjava2</groupId>
    --- End diff --
    
    This was absolutely supposed to be test-only. I'll fix it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by ok2c <gi...@git.apache.org>.
Github user ok2c commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212394256
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.concurrent.BasicFuture;
    +import org.apache.hc.core5.concurrent.FutureCallback;
    +import org.apache.hc.core5.http.EntityDetails;
    +import org.apache.hc.core5.http.Header;
    +import org.apache.hc.core5.http.HttpResponse;
    +import org.apache.hc.core5.http.Message;
    +import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
    +import org.apache.hc.core5.http.nio.CapacityChannel;
    +import org.apache.hc.core5.http.protocol.HttpContext;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.Future;
    +
    +/**
    + * An {@link AsyncResponseConsumer} that publishes the response body through
    + * a {@link Publisher}, as defined by the Reactive Streams specification. The
    + * response is represented as a {@link Message} consisting of a {@link
    + * HttpResponse} representing the headers and a {@link Publisher} representing
    + * the response body as an asynchronous stream of {@link ByteBuffer} instances.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Void> {
    +
    +    private final ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
    +    private final List<Header> trailers = Collections.synchronizedList(new ArrayList<Header>());
    +    private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
    +
    +    private volatile BasicFuture<Void> responseCompletion;
    +    private volatile HttpResponse informationResponse;
    +    private volatile EntityDetails entityDetails;
    +
    +    /**
    +     * Creates a {@code ReactiveResponseConsumer}.
    +     */
    +    public ReactiveResponseConsumer() {
    +        this.responseFuture = new BasicFuture<>(null);
    +    }
    +
    +    /**
    +     * Creates a {@code ReactiveResponseConsumer} that will call back the supplied {@link FutureCallback} with a
    +     * streamable response.
    +     *
    +     * @param responseCallback the callback to invoke when the response is available for consumption.
    +     */
    +    public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
    +        this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
    +    }
    +
    +    public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
    +        return responseFuture;
    +    }
    +
    +    /**
    +     * Returns the intermediate (1xx) HTTP response if one was received.
    +     *
    +     * @return the information response, or {@code null} if none.
    +     */
    +    public HttpResponse getInformationResponse() {
    +        return informationResponse;
    +    }
    +
    +    /**
    +     * Returns the response entity details.
    +     *
    +     * @return the entity details, or {@code null} if none.
    +     */
    +    public EntityDetails getEntityDetails() {
    +        return entityDetails;
    +    }
    +
    +    /**
    +     * Returns the trailers received at the end of the response.
    +     *
    +     * @return a non-null list of zero or more trailers.
    +     */
    +    public List<Header> getTrailers() {
    +        return trailers;
    +    }
    +
    +    @Override
    +    public void consumeResponse(
    +        final HttpResponse response,
    +        final EntityDetails entityDetails,
    +        final HttpContext httpContext,
    +        final FutureCallback<Void> resultCallback
    +    ) {
    +        this.entityDetails = entityDetails;
    +        this.responseCompletion = new BasicFuture<>(resultCallback);
    +        this.responseFuture.completed(new Message<HttpResponse, Publisher<ByteBuffer>>(response, reactiveDataConsumer));
    --- End diff --
    
    @rschmitt Why not using a simple `Consumer` instead? Do we really need another `Future` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by rschmitt <gi...@git.apache.org>.
Github user rschmitt closed the pull request at:

    https://github.com/apache/httpcomponents-core/pull/74


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by ok2c <gi...@git.apache.org>.
Github user ok2c commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r213206278
  
    --- Diff: httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java ---
    @@ -0,0 +1,175 @@
    +/*
    + * ====================================================================
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + * ====================================================================
    + *
    + * This software consists of voluntary contributions made by many
    + * individuals on behalf of the Apache Software Foundation.  For more
    + * information on the Apache Software Foundation, please see
    + * <http://www.apache.org/>.
    + *
    + */
    +package org.apache.hc.core5.reactive;
    +
    +import org.apache.hc.core5.annotation.Contract;
    +import org.apache.hc.core5.annotation.ThreadingBehavior;
    +import org.apache.hc.core5.http.ProtocolException;
    +import org.apache.hc.core5.http.nio.AsyncDataProducer;
    +import org.apache.hc.core5.http.nio.DataStreamChannel;
    +import org.apache.hc.core5.util.Args;
    +import org.reactivestreams.Publisher;
    +import org.reactivestreams.Subscriber;
    +import org.reactivestreams.Subscription;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayDeque;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * An asynchronous data producer that supports Reactive Streams.
    + *
    + * @since 5.0
    + */
    +@Contract(threading = ThreadingBehavior.SAFE)
    +final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
    +
    +    private static final int BUFFER_WINDOW_SIZE = 5;
    +
    +    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
    +    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
    +    private final AtomicBoolean complete = new AtomicBoolean(false);
    +    private final Publisher<ByteBuffer> publisher;
    +    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
    +    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
    +
    +    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
    +        this.publisher = Args.notNull(publisher, "publisher");
    +    }
    +
    +    @Override
    +    public void onSubscribe(final Subscription subscription) {
    +        if (this.subscription.getAndSet(subscription) != null) {
    +            throw new IllegalStateException("Already subscribed");
    +        }
    +
    +        subscription.request(BUFFER_WINDOW_SIZE);
    +    }
    +
    +    @Override
    +    public void onNext(final ByteBuffer byteBuffer) {
    +        final byte[] copy = new byte[byteBuffer.remaining()];
    +        byteBuffer.get(copy);
    +        synchronized (buffers) {
    +            buffers.add(ByteBuffer.wrap(copy));
    +        }
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onError(final Throwable throwable) {
    +        subscription.set(null);
    +        exception.set(throwable);
    +        signalReadiness();
    +    }
    +
    +    @Override
    +    public void onComplete() {
    +        subscription.set(null);
    +        complete.set(true);
    +        signalReadiness();
    +    }
    +
    +    private void signalReadiness() {
    +        final DataStreamChannel channel = requestChannel.get();
    +        if (channel == null) {
    +            throw new IllegalStateException("Output channel is not set");
    +        }
    +        channel.requestOutput();
    +    }
    +
    +    @Override
    +    public int available() {
    +        if (exception.get() != null || complete.get()) {
    +            return 1;
    +        } else {
    +            synchronized (buffers) {
    +                int sum = 0;
    +                for (final ByteBuffer buffer : buffers) {
    +                    sum += buffer.remaining();
    +                }
    +                return sum;
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void produce(final DataStreamChannel channel) throws IOException {
    +        if (requestChannel.get() == null) {
    +            requestChannel.set(channel);
    +            publisher.subscribe(this);
    +        }
    +
    +        final Throwable t = exception.get();
    +        final Subscription s = subscription.get();
    +        int buffersToReplenish = 0;
    +        try {
    +            synchronized (buffers) {
    +                if (t != null) {
    +                    // TODO: We need a reliable way to send RST_STREAM (without subsequent GOAWAY) on HTTP/2.
    --- End diff --
    
    @rschmitt I'll refactor that bit once the change-set gets merged.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core issue #74: HTTPCLIENT-1942: Add support for Reactive Str...

Posted by ok2c <gi...@git.apache.org>.
Github user ok2c commented on the issue:

    https://github.com/apache/httpcomponents-core/pull/74
  
    @rschmitt Committed to master as 1bae9643c912536793e820ac546730871e37155a. Please review and close this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


[GitHub] httpcomponents-core pull request #74: HTTPCLIENT-1942: Add support for React...

Posted by ok2c <gi...@git.apache.org>.
Github user ok2c commented on a diff in the pull request:

    https://github.com/apache/httpcomponents-core/pull/74#discussion_r212393518
  
    --- Diff: httpcore5-reactive/pom.xml ---
    @@ -0,0 +1,87 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  ====================================================================
    +  Licensed to the Apache Software Foundation (ASF) under one
    +  or more contributor license agreements.  See the NOTICE file
    +  distributed with this work for additional information
    +  regarding copyright ownership.  The ASF licenses this file
    +  to you under the Apache License, Version 2.0 (the
    +  "License"); you may not use this file except in compliance
    +  with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +  Unless required by applicable law or agreed to in writing,
    +  software distributed under the License is distributed on an
    +  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +  KIND, either express or implied.  See the License for the
    +  specific language governing permissions and limitations
    +  under the License.
    +  ====================================================================
    +
    +  This software consists of voluntary contributions made by many
    +  individuals on behalf of the Apache Software Foundation.  For more
    +  information on the Apache Software Foundation, please see
    +  <http://www.apache.org />.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <parent>
    +    <artifactId>httpcore5-parent</artifactId>
    +    <groupId>org.apache.httpcomponents.core5</groupId>
    +    <version>5.0-beta3-SNAPSHOT</version>
    +  </parent>
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <artifactId>httpcore5-reactive</artifactId>
    +  <name>Apache HttpComponents Core Reactive Extensions</name>
    +  <description>Apache HttpComponents Reactive Streams Bindings</description>
    +  <url>http://hc.apache.org/httpcomponents-core-ga</url>
    +  <packaging>jar</packaging>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>org.apache.httpcomponents.core5</groupId>
    +      <artifactId>httpcore5</artifactId>
    +      <version>${project.version}</version>
    +      <scope>compile</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.reactivestreams</groupId>
    +      <artifactId>reactive-streams</artifactId>
    +      <version>1.0.2</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>junit</groupId>
    +      <artifactId>junit</artifactId>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>io.reactivex.rxjava2</groupId>
    --- End diff --
    
    @rschmitt Can we remove compile dependency on `io.reactivex.rxjava2:rxjava` and make it test only or at least optional?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org