You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hc.apache.org by Roy Hashimoto <ro...@gmail.com> on 2019/09/06 16:32:04 UTC

throttling calls to AsyncEntityProducer in HttpCore 5

I'm playing with asynchronous handlers in HttpCore 5, and I'd like to have
an AsyncEntityProducer write data at its own (slow) rate like in this old
thread <https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2>.
Writing to the DataStreamChannel whenever I want - outside the scope of a
produce() method call - works fine, but I notice that produce() is being
called every 5-6 milliseconds which ideally I would like to eliminate or
reduce.

The answer in the old thread was to use IOControl.suspendOutput() and
IOControl.requestOutput(), but this class appears no longer to be in
HttpCore 5. I see that there is a DataStreamChannel.requestOutput() but I
haven't figured out what suspension call that should be paired with. I have
tried simply returning 0 from my AsyncEntityProducer.available() override,
but that doesn't seem to be it.

Is there a new way to suspend/resume output in HttpCore 5?

Thanks!
Roy

Kotlin source here
<https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250>.

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Mon, 2019-09-09 at 22:49 +0200, Oleg Kalnichevski wrote:
> On Mon, 2019-09-09 at 12:34 -0700, Roy Hashimoto wrote:
> > > 
> > > Read / write operations on IOSession are non-blocking and never
> > > block.
> > 
> > 
> > Ah, I see. I was thrown off by the read call returning the number
> > of
> > bytes
> > read. I was expecting a non-blocking call to return a future or
> > take
> > a
> > handler.
> > 
> > Would you like me to file a bug for the consume() throttling?
> > 
> > Roy
> > 
> 
> Please do so, but I need to analyze the problem a bit more to confirm
> this is a bug.
> 

I can confirm this is a bug. Please raise a high priority JIRA issue
for this defect.

Oleg



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Mon, 2019-09-09 at 12:34 -0700, Roy Hashimoto wrote:
> > 
> > Read / write operations on IOSession are non-blocking and never
> > block.
> 
> 
> Ah, I see. I was thrown off by the read call returning the number of
> bytes
> read. I was expecting a non-blocking call to return a future or take
> a
> handler.
> 
> Would you like me to file a bug for the consume() throttling?
> 
> Roy
> 

Please do so, but I need to analyze the problem a bit more to confirm
this is a bug.

Cheers

Oleg


> On Mon, Sep 9, 2019 at 11:54 AM Oleg Kalnichevski <ol...@apache.org>
> wrote:
> 
> > On Mon, 2019-09-09 at 11:45 -0700, Roy Hashimoto wrote:
> > > I have observed it clearing the bit. I have seen the blocking
> > > read
> > > here
> > > <
> > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L271
> > > > 
> > > 
> > > happen when the Selector interestOps is 0, so the Selector does
> > > not
> > > seem to
> > > be gating the read.
> > > 
> > 
> > Read / write operations on IOSession are non-blocking and never
> > block.
> > 
> > Oleg
> > 
> > > Roy
> > > 
> > > On Mon, Sep 9, 2019 at 11:37 AM Oleg Kalnichevski <
> > > olegk@apache.org>
> > > wrote:
> > > 
> > > > On Mon, 2019-09-09 at 11:01 -0700, Roy Hashimoto wrote:
> > > > > If this is a real issue, I believe it happens with
> > > > > AbstractHttp1StreamDuplexer when consuming entities on both
> > > > > server
> > > > > and client. Here is a sample server program to investigate
> > > > > the
> > > > > behavior. It would probably be easier to test with a client
> > > > > program
> > > > > but I've been using the server API so that was quicker for me
> > > > > to
> > > > > write.
> > > > > 
> > > > > The sample is pretty minimal. All I've done is set the HTTP1
> > > > > initial
> > > > > window size to 8192 and add an AsyncExchangeHandler that
> > > > > creates
> > > > > an
> > > > > empty response and logs the AsyncDataConsumer method
> > > > > implementations
> > > > > to the console. Note that I intentionally never update the
> > > > > CapacityChannel - I have effectively made an infinitely slow
> > > > > consumer.
> > > > > 
> > > > > To test, post data of more than 8 KB to the server (e.g.
> > > > > using
> > > > > curl).
> > > > > I expect to see in the log consume() calls totaling at least
> > > > > 8192
> > > > > and
> > > > > nothing after that until the socket times out. Instead, what
> > > > > I
> > > > > see is
> > > > > consume() calls delivering all the data and then streamEnd().
> > > > > 
> > > > > Here's sample output for uploading almost 32 KB. I have used
> > > > > my
> > > > > IDE
> > > > > to add a logging breakpoint to show the capacity window on
> > > > > this
> > > > > line,
> > > > > so those are also included in the output below (in green if
> > > > > you're
> > > > > viewing HTML mail).
> > > > > 
> > > > > Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE:
> > > > > Listening on /0:0:0:0:0:0:0:0:8080
> > > > > Mon Sep 09, 2019 10:27:31.294 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > handleRequest FINEST: handleRequest called
> > > > > Mon Sep 09, 2019 10:27:31.332 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > consume FINEST: consume 8192 total 8192
> > > > > AbstractHttp1StreamDuplexer:313 capacity=0
> > > > > Mon Sep 09, 2019 10:27:31.341 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > updateCapacity FINEST: updateCapacity called
> > > > > Mon Sep 09, 2019 10:27:31.342 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > consume FINEST: consume 8192 total 16384
> > > > > AbstractHttp1StreamDuplexer:313 capacity=-8192
> > > > > Mon Sep 09, 2019 10:27:31.346 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > updateCapacity FINEST: updateCapacity called
> > > > > Mon Sep 09, 2019 10:27:31.348 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > consume FINEST: consume 8192 total 24576
> > > > > AbstractHttp1StreamDuplexer:313 capacity=-16384
> > > > > Mon Sep 09, 2019 10:27:31.352 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > updateCapacity FINEST: updateCapacity called
> > > > > Mon Sep 09, 2019 10:27:31.354 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > consume FINEST: consume 8150 total 32726
> > > > > AbstractHttp1StreamDuplexer:313 capacity=-24534
> > > > > Mon Sep 09, 2019 10:27:31.357 AM
> > > > > SlowConsumerTest$MyExchangeHandler
> > > > > streamEnd FINEST: streamEnd called
> > > > > 
> > > > > My expectation is that there would no calls to consume() once
> > > > > capacity reached 0 and we would never see streamEnd().
> > > > > 
> > > > 
> > > > Roy
> > > > 
> > > > My expectations would be the same. The protocol handler should
> > > > be
> > > > clearing read interest if the capacity window drops below zero,
> > > > see:
> > > > 
> > > > 
> > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L609
> > > > 
> > > > I will investigate why it does not happen.
> > > > 
> > > > Oleg
> > > > 
> > > > > Roy
> > > > > 
> > > > > On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <
> > > > > olegk@apache.org>
> > > > > wrote:
> > > > > > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > 
> > > > > > client.start(Http1Config.custom().setBufferSize(256).setIni
> > > > > > tial
> > > > > > Wind
> > > > > > > > owSize(32).build());
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > This change alone will have little effect without
> > > > > > > > adjusting
> > > > > > > > the
> > > > > > > > session
> > > > > > > > buffer side as well. Given that the default session
> > > > > > > > buffer
> > > > > > > > size
> > > > > > 
> > > > > > is
> > > > > > > > 8192
> > > > > > > > with 32 byte initial capacity window one is likely to
> > > > > > > > get
> > > > > > 
> > > > > > multiple
> > > > > > > > #consume invocations with negative capacity.
> > > > > > > 
> > > > > > > 
> > > > > > > Hmm. Isn't setBufferSize(256) for the session buffer?
> > > > > > > Also,
> > > > > > > if I
> > > > > > > increase
> > > > > > > the amount of request data in the test to more than 8 KB:
> > > > > > > 
> > > > > > >   return new MultiLineResponseHandler("0123456789abcd",
> > > > > > > 1000);
> > > > > > > 
> > > > > > > Then I can see the capacity window go far more negative
> > > > > > > than
> > > > > > 
> > > > > > -8192,
> > > > > > > which I
> > > > > > > wouldn't expect with your explanation.
> > > > > > > 
> > > > > > > It looks like AbstractHttp1StreamDuplexer reads its data
> > > > > > > with
> > > > > > > a
> > > > > > > blocking
> > > > > > > read without explicitly waiting on the NIO Selector
> > > > > > > configured by
> > > > > > > CapacityChannel (unless I missed it in the code). Does
> > > > > > > this
> > > > > > > type
> > > > > > 
> > > > > > of
> > > > > > > read
> > > > > > > implicitly use the Selector? It doesn't appear to me that
> > > > > > > it
> > > > > > 
> > > > > > does.
> > > > > > > For
> > > > > > > example, I can reach this line
> > > > > > > <
> > > > > > > 
> > > > 
> > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> > > > > > > > 
> > > > > > > 
> > > > > > > right
> > > > > > > after the blocking read() when
> > > > > > > session.ioSession.key.interestOps
> > > > > > 
> > > > > > is
> > > > > > > 0.
> > > > > > > 
> > > > > > > Don't waste time on this if I'm not making sense. I'm not
> > > > > > > that
> > > > > > > familiar
> > > > > > > with the NIO API, and I don't have a use case that
> > > > > > > depends on
> > > > > > 
> > > > > > this.
> > > > > > > Just
> > > > > > > wanted to bring it to someone's attention in case it
> > > > > > > indicated an
> > > > > > > issue.
> > > > > > > 
> > > > > > > Roy
> > > > > > 
> > > > > > Roy
> > > > > > 
> > > > > > I am not sure we are talking about the same things because
> > > > > > it
> > > > > > do
> > > > > > not
> > > > > > even understand if you are talking about the server side,
> > > > > > client
> > > > > > side
> > > > > > or both. Can you put together a test application that
> > > > > > reproduces
> > > > > > the
> > > > > > issue and describe what think is wrong?
> > > > > > 
> > > > > > Oleg
> > > > > > 
> > > > > > > 
> > > > > > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <
> > > > > > 
> > > > > > olegk@apache.org>
> > > > > > > wrote:
> > > > > > > 
> > > > > > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > > > > > > > By looking at the suggested 5.0 examples I was able
> > > > > > > > > to
> > > > > > > > > get an
> > > > > > > > > AsyncServerExchangeHandler subclass to play nicely
> > > > > > > > > with
> > > > > > 
> > > > > > Kotlin
> > > > > > > > > coroutines
> > > > > > > > > on the AsyncDataProducer side of things, i.e.
> > > > > > > > > minimizing
> > > > > > > > > produce()
> > > > > > > > > polling
> > > > > > > > > and avoiding buffering.
> > > > > > > > > 
> > > > > > > > > I haven't been as successful with throttling calls on
> > > > > > > > > the
> > > > > > > > > AsyncDataConsumer
> > > > > > > > > side, i.e. consume() calls keep being made even
> > > > > > > > > though
> > > > > > > > > the
> > > > > > > > > capacity
> > > > > > > > > window
> > > > > > > > > has gone negative. I think this might be the expected
> > > > > > 
> > > > > > behavior
> > > > > > > > > because of
> > > > > > > > > this comment in AbstractHttp1StreamDuplexer:
> > > > > > > > > 
> > > > > > > > >         // At present the consumer can be forced to
> > > > > > > > > consume
> > > > > > 
> > > > > > data
> > > > > > > > >         // over its declared capacity in order to
> > > > > > > > > avoid
> > > > > > 
> > > > > > having
> > > > > > > > >         // unprocessed message body content stuck in
> > > > > > > > > the
> > > > > > 
> > > > > > session
> > > > > > > > >         // input buffer
> > > > > > > > > 
> > > > > > > > > Does that refer to just the case where the capacity
> > > > > > > > > starts
> > > > > > > > > positive
> > > > > > > > > but
> > > > > > > > > data exceeding the capacity is delivered to
> > > > > > > > > consume()? Or
> > > > > > 
> > > > > > does it
> > > > > > > > > refer to
> > > > > > > > > the behavior I see, which is that capacity updates
> > > > > > > > > (or
> > > > > > > > > the
> > > > > > 
> > > > > > lack
> > > > > > > > > of
> > > > > > > > > them)
> > > > > > > > > don't seem to have any effect for HTTP/1.1?
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > It is the former. Whatever data read from the
> > > > > > > > underlying
> > > > > > > > socket
> > > > > > > > channel
> > > > > > > > into the session buffer will be force-fed into the
> > > > > > > > consumer
> > > > > > > > regardless
> > > > > > > > of its declared capacity for HTTP/1.1 connections.
> > > > > > > > 
> > > > > > > > > I've also tried running the
> > > > > > > > > Http1IntegrationTest.testSlowResponseConsumer()
> > > > > > > > > test, substituting this line to trigger
> > > > > > > > > updateCapacity()
> > > > > > 
> > > > > > calls:
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > 
> > > > > > client.start(Http1Config.custom().setBufferSize(256).setIni
> > > > > > tial
> > > > > > Wi
> > > > > > > > > ndow
> > > > > > > > > Size(32).build());
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > This change alone will have little effect without
> > > > > > > > adjusting
> > > > > > > > the
> > > > > > > > session
> > > > > > > > buffer side as well. Given that the default session
> > > > > > > > buffer
> > > > > > > > size
> > > > > > 
> > > > > > is
> > > > > > > > 8192
> > > > > > > > with 32 byte initial capacity window one is likely to
> > > > > > > > get
> > > > > > 
> > > > > > multiple
> > > > > > > > #consume invocations with negative capacity.
> > > > > > > > 
> > > > > > > > 
> > > > > > > > > By adding the small initial window, I can see the
> > > > > > 
> > > > > > capacityWindow
> > > > > > > > > going more
> > > > > > > > > and more negative on each consume() call with all the
> > > > > > > > > data
> > > > > > > > > buffered
> > > > > > > > > before
> > > > > > > > > the test code completes its first sleep.
> > > > > > > > > 
> > > > > > > > > I don't have a specific use case for a slow consumer,
> > > > > > > > > just
> > > > > > 
> > > > > > want
> > > > > > > > > to
> > > > > > > > > know if
> > > > > > > > > I'm misunderstanding something.
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > I realized that we likely need to adjust the session
> > > > > > > > buffer
> > > > > > 
> > > > > > size
> > > > > > > > automatically when the initial window setting is below
> > > > > > > > than
> > > > > > 
> > > > > > value.
> > > > > > > > 
> > > > > > > > Hope this helps
> > > > > > > > 
> > > > > > > > Oleg
> > > > > > > > 
> > > > > > > > > Thanks!
> > > > > > > > > Roy
> > > > > > > > > 
> > > > > > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > > > > > > > roy.hashimoto@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > 
> > > > > > > > > > Those are good leads, I'll pursue them.
> > > > > > > > > > 
> > > > > > > > > > Thanks!
> > > > > > > > > > Roy
> > > > > > > > > > 
> > > > > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > > > > > > > olegk@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > > 
> > > > > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt
> > > > > > > > > > > wrote:
> > > > > > > > > > > > Have you looked at the reactive extensions for
> > > > > > 
> > > > > > HttpCore5?
> > > > > > > > > > > > They
> > > > > > > > > > > > demonstrate
> > > > > > > > > > > > how to implement
> > > > > > > > > > > > AsyncEntityProducer/AsyncDataProducer
> > > > > > 
> > > > > > with
> > > > > > > > > > > > support
> > > > > > > > > > > > for
> > > > > > > > > > > > backpressure (or you can just use the Reactive
> > > > > > > > > > > > Streams
> > > > > > 
> > > > > > API
> > > > > > > > > > > > instead):
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > 
> > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > Just a bit of background. In 5.0 one can no
> > > > > > > > > > > longer
> > > > > > > > > > > assume
> > > > > > > > > > > that
> > > > > > > > > > > one
> > > > > > > > > > > message exchange has exclusive ownership of the
> > > > > > 
> > > > > > underlying
> > > > > > > > > > > connection.
> > > > > > > > > > > Multiplexed message exchanges in HTTP/2 and
> > > > > > > > > > > piplelined
> > > > > > > > > > > message
> > > > > > > > > > > exchanges in HTTP/1.1 must not block other
> > > > > > > > > > > concurrent
> > > > > > > > > > > exchanges.
> > > > > > > > > > > Message changes however can update their current
> > > > > > > > > > > capacity
> > > > > > 
> > > > > > via
> > > > > > > > > > > `CapacityChannel`. Reactive extensions is a great
> > > > > > > > > > > example
> > > > > > 
> > > > > > and
> > > > > > > > > > > also an
> > > > > > > > > > > alternative to the native APIs per Ryan's
> > > > > > > > > > > recommendation.
> > > > > > > > > > > 
> > > > > > > > > > > If you prefer the native APIs you can take a look
> > > > > > > > > > > at
> > > > > > > > > > > the
> > > > > > > > > > > classic
> > > > > > > > > > > I/O
> > > > > > > > > > > adaptors that essentially emulate the classic
> > > > > > > > > > > blocking
> > > > > > 
> > > > > > i/o on
> > > > > > > > > > > top
> > > > > > > > > > > of
> > > > > > > > > > > the new async APIs [1] or HTTP/1.1 integration
> > > > > > > > > > > tests
> > > > > > > > > > > [2]
> > > > > > 
> > > > > > that
> > > > > > > > > > > have a
> > > > > > > > > > > number of 'slow' consumer / producer test cases.
> > > > > > > > > > > 
> > > > > > > > > > > Cheers
> > > > > > > > > > > 
> > > > > > > > > > > Oleg
> > > > > > > > > > > 
> > > > > > > > > > > [1]
> > > > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > 
> > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > > > > > > > [2]
> > > > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > 
> > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > > > > > > > roy.hashimoto@gmail.com
> > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > 
> > > > > > > > > > > > > I'm playing with asynchronous handlers in
> > > > > > > > > > > > > HttpCore 5,
> > > > > > 
> > > > > > and
> > > > > > > > > > > > > I'd
> > > > > > > > > > > > > like
> > > > > > > > > > > > > to have
> > > > > > > > > > > > > an AsyncEntityProducer write data at its own
> > > > > > > > > > > > > (slow)
> > > > > > 
> > > > > > rate
> > > > > > > > > > > > > like
> > > > > > > > > > > > > in
> > > > > > > > > > > > > this old
> > > > > > > > > > > > > thread <
> > > > > > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > 
> > > > > > 
> > 
> > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > > > > > > > .
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Writing to the DataStreamChannel whenever I
> > > > > > > > > > > > > want
> > > > > > > > > > > > > -
> > > > > > > > > > > > > outside
> > > > > > > > > > > > > the
> > > > > > > > > > > > > scope of a
> > > > > > > > > > > > > produce() method call - works fine, but I
> > > > > > > > > > > > > notice
> > > > > > > > > > > > > that
> > > > > > > > > > > > > produce() is
> > > > > > > > > > > > > being
> > > > > > > > > > > > > called every 5-6 milliseconds which ideally I
> > > > > > > > > > > > > would
> > > > > > 
> > > > > > like
> > > > > > > > > > > > > to
> > > > > > > > > > > > > eliminate or
> > > > > > > > > > > > > reduce.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > The answer in the old thread was to use
> > > > > > > > > > > > > IOControl.suspendOutput()
> > > > > > > > > > > > > and
> > > > > > > > > > > > > IOControl.requestOutput(), but this class
> > > > > > > > > > > > > appears
> > > > > > > > > > > > > no
> > > > > > > > > > > > > longer
> > > > > > > > > > > > > to be
> > > > > > > > > > > > > in
> > > > > > > > > > > > > HttpCore 5. I see that there is a
> > > > > > > > > > > > > DataStreamChannel.requestOutput()
> > > > > > > > > > > > > but I
> > > > > > > > > > > > > haven't figured out what suspension call that
> > > > > > > > > > > > > should
> > > > > > 
> > > > > > be
> > > > > > > > > > > > > paired
> > > > > > > > > > > > > with. I have
> > > > > > > > > > > > > tried simply returning 0 from my
> > > > > > > > > > > > > AsyncEntityProducer.available()
> > > > > > > > > > > > > override,
> > > > > > > > > > > > > but that doesn't seem to be it.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Is there a new way to suspend/resume output
> > > > > > > > > > > > > in
> > > > > > 
> > > > > > HttpCore
> > > > > > > > > > > > > 5?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thanks!
> > > > > > > > > > > > > Roy
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Kotlin source here
> > > > > > > > > > > > > <
> > > > > > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > 
> > > > > > 
> > 
> > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > > > > > > > .
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > -----------------------------------------------
> > > > > > > > > > > ----
> > > > > > > > > > > ----
> > > > > > 
> > > > > > ----
> > > > > > > > > > > ----
> > > > > > > > > > > ------
> > > > > > > > > > > To unsubscribe, e-mail: 
> > > > > > > > > > > dev-unsubscribe@hc.apache.org
> > > > > > > > > > > For additional commands, e-mail:
> > > > > > > > > > > dev-help@hc.apache.org
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > -----------------------------------------------------
> > > > > > > > ----
> > > > > > > > ----
> > > > > > 
> > > > > > ----
> > > > > > > > ----
> > > > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > > > > 
> > > > > > > > 
> > > > > > 
> > > > > > 
> > > > > > ---------------------------------------------------------
> > > > > > ----
> > > > > > ----
> > > > > > ----
> > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > > 
> > > > > 
> > > > > -----------------------------------------------------------
> > > > > ----
> > > > > ------
> > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > 
> > > > 
> > > > -------------------------------------------------------------
> > > > ----
> > > > ----
> > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > 
> > > > 
> > 
> > 
> > -----------------------------------------------------------------
> > ----
> > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > For additional commands, e-mail: dev-help@hc.apache.org
> > 
> > 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Roy Hashimoto <ro...@gmail.com>.
>
> Read / write operations on IOSession are non-blocking and never block.


Ah, I see. I was thrown off by the read call returning the number of bytes
read. I was expecting a non-blocking call to return a future or take a
handler.

Would you like me to file a bug for the consume() throttling?

Roy

On Mon, Sep 9, 2019 at 11:54 AM Oleg Kalnichevski <ol...@apache.org> wrote:

> On Mon, 2019-09-09 at 11:45 -0700, Roy Hashimoto wrote:
> > I have observed it clearing the bit. I have seen the blocking read
> > here
> > <
> >
> https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L271
> > >
> > happen when the Selector interestOps is 0, so the Selector does not
> > seem to
> > be gating the read.
> >
>
> Read / write operations on IOSession are non-blocking and never block.
>
> Oleg
>
> > Roy
> >
> > On Mon, Sep 9, 2019 at 11:37 AM Oleg Kalnichevski <ol...@apache.org>
> > wrote:
> >
> > > On Mon, 2019-09-09 at 11:01 -0700, Roy Hashimoto wrote:
> > > > If this is a real issue, I believe it happens with
> > > > AbstractHttp1StreamDuplexer when consuming entities on both
> > > > server
> > > > and client. Here is a sample server program to investigate the
> > > > behavior. It would probably be easier to test with a client
> > > > program
> > > > but I've been using the server API so that was quicker for me to
> > > > write.
> > > >
> > > > The sample is pretty minimal. All I've done is set the HTTP1
> > > > initial
> > > > window size to 8192 and add an AsyncExchangeHandler that creates
> > > > an
> > > > empty response and logs the AsyncDataConsumer method
> > > > implementations
> > > > to the console. Note that I intentionally never update the
> > > > CapacityChannel - I have effectively made an infinitely slow
> > > > consumer.
> > > >
> > > > To test, post data of more than 8 KB to the server (e.g. using
> > > > curl).
> > > > I expect to see in the log consume() calls totaling at least 8192
> > > > and
> > > > nothing after that until the socket times out. Instead, what I
> > > > see is
> > > > consume() calls delivering all the data and then streamEnd().
> > > >
> > > > Here's sample output for uploading almost 32 KB. I have used my
> > > > IDE
> > > > to add a logging breakpoint to show the capacity window on this
> > > > line,
> > > > so those are also included in the output below (in green if
> > > > you're
> > > > viewing HTML mail).
> > > >
> > > > Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE:
> > > > Listening on /0:0:0:0:0:0:0:0:8080
> > > > Mon Sep 09, 2019 10:27:31.294 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > handleRequest FINEST: handleRequest called
> > > > Mon Sep 09, 2019 10:27:31.332 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > consume FINEST: consume 8192 total 8192
> > > > AbstractHttp1StreamDuplexer:313 capacity=0
> > > > Mon Sep 09, 2019 10:27:31.341 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > updateCapacity FINEST: updateCapacity called
> > > > Mon Sep 09, 2019 10:27:31.342 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > consume FINEST: consume 8192 total 16384
> > > > AbstractHttp1StreamDuplexer:313 capacity=-8192
> > > > Mon Sep 09, 2019 10:27:31.346 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > updateCapacity FINEST: updateCapacity called
> > > > Mon Sep 09, 2019 10:27:31.348 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > consume FINEST: consume 8192 total 24576
> > > > AbstractHttp1StreamDuplexer:313 capacity=-16384
> > > > Mon Sep 09, 2019 10:27:31.352 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > updateCapacity FINEST: updateCapacity called
> > > > Mon Sep 09, 2019 10:27:31.354 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > consume FINEST: consume 8150 total 32726
> > > > AbstractHttp1StreamDuplexer:313 capacity=-24534
> > > > Mon Sep 09, 2019 10:27:31.357 AM
> > > > SlowConsumerTest$MyExchangeHandler
> > > > streamEnd FINEST: streamEnd called
> > > >
> > > > My expectation is that there would no calls to consume() once
> > > > capacity reached 0 and we would never see streamEnd().
> > > >
> > >
> > > Roy
> > >
> > > My expectations would be the same. The protocol handler should be
> > > clearing read interest if the capacity window drops below zero,
> > > see:
> > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L609
> > >
> > > I will investigate why it does not happen.
> > >
> > > Oleg
> > >
> > > > Roy
> > > >
> > > > On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <
> > > > olegk@apache.org>
> > > > wrote:
> > > > > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > > > > client.start(Http1Config.custom().setBufferSize(256).setInitial
> > > > > Wind
> > > > > > > owSize(32).build());
> > > > > > > >
> > > > > > >
> > > > > > > This change alone will have little effect without adjusting
> > > > > > > the
> > > > > > > session
> > > > > > > buffer side as well. Given that the default session buffer
> > > > > > > size
> > > > >
> > > > > is
> > > > > > > 8192
> > > > > > > with 32 byte initial capacity window one is likely to get
> > > > >
> > > > > multiple
> > > > > > > #consume invocations with negative capacity.
> > > > > >
> > > > > >
> > > > > > Hmm. Isn't setBufferSize(256) for the session buffer? Also,
> > > > > > if I
> > > > > > increase
> > > > > > the amount of request data in the test to more than 8 KB:
> > > > > >
> > > > > >   return new MultiLineResponseHandler("0123456789abcd",
> > > > > > 1000);
> > > > > >
> > > > > > Then I can see the capacity window go far more negative than
> > > > >
> > > > > -8192,
> > > > > > which I
> > > > > > wouldn't expect with your explanation.
> > > > > >
> > > > > > It looks like AbstractHttp1StreamDuplexer reads its data with
> > > > > > a
> > > > > > blocking
> > > > > > read without explicitly waiting on the NIO Selector
> > > > > > configured by
> > > > > > CapacityChannel (unless I missed it in the code). Does this
> > > > > > type
> > > > >
> > > > > of
> > > > > > read
> > > > > > implicitly use the Selector? It doesn't appear to me that it
> > > > >
> > > > > does.
> > > > > > For
> > > > > > example, I can reach this line
> > > > > > <
> > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> > > > > > >
> > > > > >
> > > > > > right
> > > > > > after the blocking read() when
> > > > > > session.ioSession.key.interestOps
> > > > >
> > > > > is
> > > > > > 0.
> > > > > >
> > > > > > Don't waste time on this if I'm not making sense. I'm not
> > > > > > that
> > > > > > familiar
> > > > > > with the NIO API, and I don't have a use case that depends on
> > > > >
> > > > > this.
> > > > > > Just
> > > > > > wanted to bring it to someone's attention in case it
> > > > > > indicated an
> > > > > > issue.
> > > > > >
> > > > > > Roy
> > > > >
> > > > > Roy
> > > > >
> > > > > I am not sure we are talking about the same things because it
> > > > > do
> > > > > not
> > > > > even understand if you are talking about the server side,
> > > > > client
> > > > > side
> > > > > or both. Can you put together a test application that
> > > > > reproduces
> > > > > the
> > > > > issue and describe what think is wrong?
> > > > >
> > > > > Oleg
> > > > >
> > > > > >
> > > > > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <
> > > > >
> > > > > olegk@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > > > > > > By looking at the suggested 5.0 examples I was able to
> > > > > > > > get an
> > > > > > > > AsyncServerExchangeHandler subclass to play nicely with
> > > > >
> > > > > Kotlin
> > > > > > > > coroutines
> > > > > > > > on the AsyncDataProducer side of things, i.e. minimizing
> > > > > > > > produce()
> > > > > > > > polling
> > > > > > > > and avoiding buffering.
> > > > > > > >
> > > > > > > > I haven't been as successful with throttling calls on the
> > > > > > > > AsyncDataConsumer
> > > > > > > > side, i.e. consume() calls keep being made even though
> > > > > > > > the
> > > > > > > > capacity
> > > > > > > > window
> > > > > > > > has gone negative. I think this might be the expected
> > > > >
> > > > > behavior
> > > > > > > > because of
> > > > > > > > this comment in AbstractHttp1StreamDuplexer:
> > > > > > > >
> > > > > > > >         // At present the consumer can be forced to
> > > > > > > > consume
> > > > >
> > > > > data
> > > > > > > >         // over its declared capacity in order to avoid
> > > > >
> > > > > having
> > > > > > > >         // unprocessed message body content stuck in the
> > > > >
> > > > > session
> > > > > > > >         // input buffer
> > > > > > > >
> > > > > > > > Does that refer to just the case where the capacity
> > > > > > > > starts
> > > > > > > > positive
> > > > > > > > but
> > > > > > > > data exceeding the capacity is delivered to consume()? Or
> > > > >
> > > > > does it
> > > > > > > > refer to
> > > > > > > > the behavior I see, which is that capacity updates (or
> > > > > > > > the
> > > > >
> > > > > lack
> > > > > > > > of
> > > > > > > > them)
> > > > > > > > don't seem to have any effect for HTTP/1.1?
> > > > > > > >
> > > > > > >
> > > > > > > It is the former. Whatever data read from the underlying
> > > > > > > socket
> > > > > > > channel
> > > > > > > into the session buffer will be force-fed into the consumer
> > > > > > > regardless
> > > > > > > of its declared capacity for HTTP/1.1 connections.
> > > > > > >
> > > > > > > > I've also tried running the
> > > > > > > > Http1IntegrationTest.testSlowResponseConsumer()
> > > > > > > > test, substituting this line to trigger updateCapacity()
> > > > >
> > > > > calls:
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > >
> > > > > client.start(Http1Config.custom().setBufferSize(256).setInitial
> > > > > Wi
> > > > > > > > ndow
> > > > > > > > Size(32).build());
> > > > > > > >
> > > > > > >
> > > > > > > This change alone will have little effect without adjusting
> > > > > > > the
> > > > > > > session
> > > > > > > buffer side as well. Given that the default session buffer
> > > > > > > size
> > > > >
> > > > > is
> > > > > > > 8192
> > > > > > > with 32 byte initial capacity window one is likely to get
> > > > >
> > > > > multiple
> > > > > > > #consume invocations with negative capacity.
> > > > > > >
> > > > > > >
> > > > > > > > By adding the small initial window, I can see the
> > > > >
> > > > > capacityWindow
> > > > > > > > going more
> > > > > > > > and more negative on each consume() call with all the
> > > > > > > > data
> > > > > > > > buffered
> > > > > > > > before
> > > > > > > > the test code completes its first sleep.
> > > > > > > >
> > > > > > > > I don't have a specific use case for a slow consumer,
> > > > > > > > just
> > > > >
> > > > > want
> > > > > > > > to
> > > > > > > > know if
> > > > > > > > I'm misunderstanding something.
> > > > > > > >
> > > > > > >
> > > > > > > I realized that we likely need to adjust the session buffer
> > > > >
> > > > > size
> > > > > > > automatically when the initial window setting is below than
> > > > >
> > > > > value.
> > > > > > >
> > > > > > > Hope this helps
> > > > > > >
> > > > > > > Oleg
> > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Roy
> > > > > > > >
> > > > > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > > > > > > roy.hashimoto@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Those are good leads, I'll pursue them.
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Roy
> > > > > > > > >
> > > > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > > > > > > olegk@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt
> > > > > > > > > > wrote:
> > > > > > > > > > > Have you looked at the reactive extensions for
> > > > >
> > > > > HttpCore5?
> > > > > > > > > > > They
> > > > > > > > > > > demonstrate
> > > > > > > > > > > how to implement
> > > > > > > > > > > AsyncEntityProducer/AsyncDataProducer
> > > > >
> > > > > with
> > > > > > > > > > > support
> > > > > > > > > > > for
> > > > > > > > > > > backpressure (or you can just use the Reactive
> > > > > > > > > > > Streams
> > > > >
> > > > > API
> > > > > > > > > > > instead):
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Just a bit of background. In 5.0 one can no longer
> > > > > > > > > > assume
> > > > > > > > > > that
> > > > > > > > > > one
> > > > > > > > > > message exchange has exclusive ownership of the
> > > > >
> > > > > underlying
> > > > > > > > > > connection.
> > > > > > > > > > Multiplexed message exchanges in HTTP/2 and
> > > > > > > > > > piplelined
> > > > > > > > > > message
> > > > > > > > > > exchanges in HTTP/1.1 must not block other concurrent
> > > > > > > > > > exchanges.
> > > > > > > > > > Message changes however can update their current
> > > > > > > > > > capacity
> > > > >
> > > > > via
> > > > > > > > > > `CapacityChannel`. Reactive extensions is a great
> > > > > > > > > > example
> > > > >
> > > > > and
> > > > > > > > > > also an
> > > > > > > > > > alternative to the native APIs per Ryan's
> > > > > > > > > > recommendation.
> > > > > > > > > >
> > > > > > > > > > If you prefer the native APIs you can take a look at
> > > > > > > > > > the
> > > > > > > > > > classic
> > > > > > > > > > I/O
> > > > > > > > > > adaptors that essentially emulate the classic
> > > > > > > > > > blocking
> > > > >
> > > > > i/o on
> > > > > > > > > > top
> > > > > > > > > > of
> > > > > > > > > > the new async APIs [1] or HTTP/1.1 integration tests
> > > > > > > > > > [2]
> > > > >
> > > > > that
> > > > > > > > > > have a
> > > > > > > > > > number of 'slow' consumer / producer test cases.
> > > > > > > > > >
> > > > > > > > > > Cheers
> > > > > > > > > >
> > > > > > > > > > Oleg
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > >
> > > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > >
> > > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > > > > > > roy.hashimoto@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > I'm playing with asynchronous handlers in
> > > > > > > > > > > > HttpCore 5,
> > > > >
> > > > > and
> > > > > > > > > > > > I'd
> > > > > > > > > > > > like
> > > > > > > > > > > > to have
> > > > > > > > > > > > an AsyncEntityProducer write data at its own
> > > > > > > > > > > > (slow)
> > > > >
> > > > > rate
> > > > > > > > > > > > like
> > > > > > > > > > > > in
> > > > > > > > > > > > this old
> > > > > > > > > > > > thread <
> > > > > > > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > > > >
> https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > > > > > > .
> > > > > > > > > > > >
> > > > > > > > > > > > Writing to the DataStreamChannel whenever I want
> > > > > > > > > > > > -
> > > > > > > > > > > > outside
> > > > > > > > > > > > the
> > > > > > > > > > > > scope of a
> > > > > > > > > > > > produce() method call - works fine, but I notice
> > > > > > > > > > > > that
> > > > > > > > > > > > produce() is
> > > > > > > > > > > > being
> > > > > > > > > > > > called every 5-6 milliseconds which ideally I
> > > > > > > > > > > > would
> > > > >
> > > > > like
> > > > > > > > > > > > to
> > > > > > > > > > > > eliminate or
> > > > > > > > > > > > reduce.
> > > > > > > > > > > >
> > > > > > > > > > > > The answer in the old thread was to use
> > > > > > > > > > > > IOControl.suspendOutput()
> > > > > > > > > > > > and
> > > > > > > > > > > > IOControl.requestOutput(), but this class appears
> > > > > > > > > > > > no
> > > > > > > > > > > > longer
> > > > > > > > > > > > to be
> > > > > > > > > > > > in
> > > > > > > > > > > > HttpCore 5. I see that there is a
> > > > > > > > > > > > DataStreamChannel.requestOutput()
> > > > > > > > > > > > but I
> > > > > > > > > > > > haven't figured out what suspension call that
> > > > > > > > > > > > should
> > > > >
> > > > > be
> > > > > > > > > > > > paired
> > > > > > > > > > > > with. I have
> > > > > > > > > > > > tried simply returning 0 from my
> > > > > > > > > > > > AsyncEntityProducer.available()
> > > > > > > > > > > > override,
> > > > > > > > > > > > but that doesn't seem to be it.
> > > > > > > > > > > >
> > > > > > > > > > > > Is there a new way to suspend/resume output in
> > > > >
> > > > > HttpCore
> > > > > > > > > > > > 5?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks!
> > > > > > > > > > > > Roy
> > > > > > > > > > > >
> > > > > > > > > > > > Kotlin source here
> > > > > > > > > > > > <
> > > > > > > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > > > >
> https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > > > > > > .
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > ---------------------------------------------------
> > > > > > > > > > ----
> > > > >
> > > > > ----
> > > > > > > > > > ----
> > > > > > > > > > ------
> > > > > > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > > > > > For additional commands, e-mail:
> > > > > > > > > > dev-help@hc.apache.org
> > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > ---------------------------------------------------------
> > > > > > > ----
> > > > >
> > > > > ----
> > > > > > > ----
> > > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > > > -------------------------------------------------------------
> > > > > ----
> > > > > ----
> > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > >
> > > >
> > > > ---------------------------------------------------------------
> > > > ------
> > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > For additional commands, e-mail: dev-help@hc.apache.org
> > >
> > >
> > > -----------------------------------------------------------------
> > > ----
> > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > For additional commands, e-mail: dev-help@hc.apache.org
> > >
> > >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> For additional commands, e-mail: dev-help@hc.apache.org
>
>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Mon, 2019-09-09 at 11:45 -0700, Roy Hashimoto wrote:
> I have observed it clearing the bit. I have seen the blocking read
> here
> <
> https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L271
> >
> happen when the Selector interestOps is 0, so the Selector does not
> seem to
> be gating the read.
> 

Read / write operations on IOSession are non-blocking and never block.

Oleg 

> Roy
> 
> On Mon, Sep 9, 2019 at 11:37 AM Oleg Kalnichevski <ol...@apache.org>
> wrote:
> 
> > On Mon, 2019-09-09 at 11:01 -0700, Roy Hashimoto wrote:
> > > If this is a real issue, I believe it happens with
> > > AbstractHttp1StreamDuplexer when consuming entities on both
> > > server
> > > and client. Here is a sample server program to investigate the
> > > behavior. It would probably be easier to test with a client
> > > program
> > > but I've been using the server API so that was quicker for me to
> > > write.
> > > 
> > > The sample is pretty minimal. All I've done is set the HTTP1
> > > initial
> > > window size to 8192 and add an AsyncExchangeHandler that creates
> > > an
> > > empty response and logs the AsyncDataConsumer method
> > > implementations
> > > to the console. Note that I intentionally never update the
> > > CapacityChannel - I have effectively made an infinitely slow
> > > consumer.
> > > 
> > > To test, post data of more than 8 KB to the server (e.g. using
> > > curl).
> > > I expect to see in the log consume() calls totaling at least 8192
> > > and
> > > nothing after that until the socket times out. Instead, what I
> > > see is
> > > consume() calls delivering all the data and then streamEnd().
> > > 
> > > Here's sample output for uploading almost 32 KB. I have used my
> > > IDE
> > > to add a logging breakpoint to show the capacity window on this
> > > line,
> > > so those are also included in the output below (in green if
> > > you're
> > > viewing HTML mail).
> > > 
> > > Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE:
> > > Listening on /0:0:0:0:0:0:0:0:8080
> > > Mon Sep 09, 2019 10:27:31.294 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > handleRequest FINEST: handleRequest called
> > > Mon Sep 09, 2019 10:27:31.332 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > consume FINEST: consume 8192 total 8192
> > > AbstractHttp1StreamDuplexer:313 capacity=0
> > > Mon Sep 09, 2019 10:27:31.341 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > updateCapacity FINEST: updateCapacity called
> > > Mon Sep 09, 2019 10:27:31.342 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > consume FINEST: consume 8192 total 16384
> > > AbstractHttp1StreamDuplexer:313 capacity=-8192
> > > Mon Sep 09, 2019 10:27:31.346 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > updateCapacity FINEST: updateCapacity called
> > > Mon Sep 09, 2019 10:27:31.348 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > consume FINEST: consume 8192 total 24576
> > > AbstractHttp1StreamDuplexer:313 capacity=-16384
> > > Mon Sep 09, 2019 10:27:31.352 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > updateCapacity FINEST: updateCapacity called
> > > Mon Sep 09, 2019 10:27:31.354 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > consume FINEST: consume 8150 total 32726
> > > AbstractHttp1StreamDuplexer:313 capacity=-24534
> > > Mon Sep 09, 2019 10:27:31.357 AM
> > > SlowConsumerTest$MyExchangeHandler
> > > streamEnd FINEST: streamEnd called
> > > 
> > > My expectation is that there would no calls to consume() once
> > > capacity reached 0 and we would never see streamEnd().
> > > 
> > 
> > Roy
> > 
> > My expectations would be the same. The protocol handler should be
> > clearing read interest if the capacity window drops below zero,
> > see:
> > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L609
> > 
> > I will investigate why it does not happen.
> > 
> > Oleg
> > 
> > > Roy
> > > 
> > > On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <
> > > olegk@apache.org>
> > > wrote:
> > > > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > > > > > 
> > > > > > > 
> > > > > > 
> > > > > > 
> > > > 
> > > > client.start(Http1Config.custom().setBufferSize(256).setInitial
> > > > Wind
> > > > > > owSize(32).build());
> > > > > > > 
> > > > > > 
> > > > > > This change alone will have little effect without adjusting
> > > > > > the
> > > > > > session
> > > > > > buffer side as well. Given that the default session buffer
> > > > > > size
> > > > 
> > > > is
> > > > > > 8192
> > > > > > with 32 byte initial capacity window one is likely to get
> > > > 
> > > > multiple
> > > > > > #consume invocations with negative capacity.
> > > > > 
> > > > > 
> > > > > Hmm. Isn't setBufferSize(256) for the session buffer? Also,
> > > > > if I
> > > > > increase
> > > > > the amount of request data in the test to more than 8 KB:
> > > > > 
> > > > >   return new MultiLineResponseHandler("0123456789abcd",
> > > > > 1000);
> > > > > 
> > > > > Then I can see the capacity window go far more negative than
> > > > 
> > > > -8192,
> > > > > which I
> > > > > wouldn't expect with your explanation.
> > > > > 
> > > > > It looks like AbstractHttp1StreamDuplexer reads its data with
> > > > > a
> > > > > blocking
> > > > > read without explicitly waiting on the NIO Selector
> > > > > configured by
> > > > > CapacityChannel (unless I missed it in the code). Does this
> > > > > type
> > > > 
> > > > of
> > > > > read
> > > > > implicitly use the Selector? It doesn't appear to me that it
> > > > 
> > > > does.
> > > > > For
> > > > > example, I can reach this line
> > > > > <
> > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> > > > > > 
> > > > > 
> > > > > right
> > > > > after the blocking read() when
> > > > > session.ioSession.key.interestOps
> > > > 
> > > > is
> > > > > 0.
> > > > > 
> > > > > Don't waste time on this if I'm not making sense. I'm not
> > > > > that
> > > > > familiar
> > > > > with the NIO API, and I don't have a use case that depends on
> > > > 
> > > > this.
> > > > > Just
> > > > > wanted to bring it to someone's attention in case it
> > > > > indicated an
> > > > > issue.
> > > > > 
> > > > > Roy
> > > > 
> > > > Roy
> > > > 
> > > > I am not sure we are talking about the same things because it
> > > > do
> > > > not
> > > > even understand if you are talking about the server side,
> > > > client
> > > > side
> > > > or both. Can you put together a test application that
> > > > reproduces
> > > > the
> > > > issue and describe what think is wrong?
> > > > 
> > > > Oleg
> > > > 
> > > > > 
> > > > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <
> > > > 
> > > > olegk@apache.org>
> > > > > wrote:
> > > > > 
> > > > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > > > > > By looking at the suggested 5.0 examples I was able to
> > > > > > > get an
> > > > > > > AsyncServerExchangeHandler subclass to play nicely with
> > > > 
> > > > Kotlin
> > > > > > > coroutines
> > > > > > > on the AsyncDataProducer side of things, i.e. minimizing
> > > > > > > produce()
> > > > > > > polling
> > > > > > > and avoiding buffering.
> > > > > > > 
> > > > > > > I haven't been as successful with throttling calls on the
> > > > > > > AsyncDataConsumer
> > > > > > > side, i.e. consume() calls keep being made even though
> > > > > > > the
> > > > > > > capacity
> > > > > > > window
> > > > > > > has gone negative. I think this might be the expected
> > > > 
> > > > behavior
> > > > > > > because of
> > > > > > > this comment in AbstractHttp1StreamDuplexer:
> > > > > > > 
> > > > > > >         // At present the consumer can be forced to
> > > > > > > consume
> > > > 
> > > > data
> > > > > > >         // over its declared capacity in order to avoid
> > > > 
> > > > having
> > > > > > >         // unprocessed message body content stuck in the
> > > > 
> > > > session
> > > > > > >         // input buffer
> > > > > > > 
> > > > > > > Does that refer to just the case where the capacity
> > > > > > > starts
> > > > > > > positive
> > > > > > > but
> > > > > > > data exceeding the capacity is delivered to consume()? Or
> > > > 
> > > > does it
> > > > > > > refer to
> > > > > > > the behavior I see, which is that capacity updates (or
> > > > > > > the
> > > > 
> > > > lack
> > > > > > > of
> > > > > > > them)
> > > > > > > don't seem to have any effect for HTTP/1.1?
> > > > > > > 
> > > > > > 
> > > > > > It is the former. Whatever data read from the underlying
> > > > > > socket
> > > > > > channel
> > > > > > into the session buffer will be force-fed into the consumer
> > > > > > regardless
> > > > > > of its declared capacity for HTTP/1.1 connections.
> > > > > > 
> > > > > > > I've also tried running the
> > > > > > > Http1IntegrationTest.testSlowResponseConsumer()
> > > > > > > test, substituting this line to trigger updateCapacity()
> > > > 
> > > > calls:
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > 
> > > > client.start(Http1Config.custom().setBufferSize(256).setInitial
> > > > Wi
> > > > > > > ndow
> > > > > > > Size(32).build());
> > > > > > > 
> > > > > > 
> > > > > > This change alone will have little effect without adjusting
> > > > > > the
> > > > > > session
> > > > > > buffer side as well. Given that the default session buffer
> > > > > > size
> > > > 
> > > > is
> > > > > > 8192
> > > > > > with 32 byte initial capacity window one is likely to get
> > > > 
> > > > multiple
> > > > > > #consume invocations with negative capacity.
> > > > > > 
> > > > > > 
> > > > > > > By adding the small initial window, I can see the
> > > > 
> > > > capacityWindow
> > > > > > > going more
> > > > > > > and more negative on each consume() call with all the
> > > > > > > data
> > > > > > > buffered
> > > > > > > before
> > > > > > > the test code completes its first sleep.
> > > > > > > 
> > > > > > > I don't have a specific use case for a slow consumer,
> > > > > > > just
> > > > 
> > > > want
> > > > > > > to
> > > > > > > know if
> > > > > > > I'm misunderstanding something.
> > > > > > > 
> > > > > > 
> > > > > > I realized that we likely need to adjust the session buffer
> > > > 
> > > > size
> > > > > > automatically when the initial window setting is below than
> > > > 
> > > > value.
> > > > > > 
> > > > > > Hope this helps
> > > > > > 
> > > > > > Oleg
> > > > > > 
> > > > > > > Thanks!
> > > > > > > Roy
> > > > > > > 
> > > > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > > > > > roy.hashimoto@gmail.com>
> > > > > > > wrote:
> > > > > > > 
> > > > > > > > Those are good leads, I'll pursue them.
> > > > > > > > 
> > > > > > > > Thanks!
> > > > > > > > Roy
> > > > > > > > 
> > > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > > > > > olegk@apache.org>
> > > > > > > > wrote:
> > > > > > > > 
> > > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt
> > > > > > > > > wrote:
> > > > > > > > > > Have you looked at the reactive extensions for
> > > > 
> > > > HttpCore5?
> > > > > > > > > > They
> > > > > > > > > > demonstrate
> > > > > > > > > > how to implement
> > > > > > > > > > AsyncEntityProducer/AsyncDataProducer
> > > > 
> > > > with
> > > > > > > > > > support
> > > > > > > > > > for
> > > > > > > > > > backpressure (or you can just use the Reactive
> > > > > > > > > > Streams
> > > > 
> > > > API
> > > > > > > > > > instead):
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > 
> > > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > Just a bit of background. In 5.0 one can no longer
> > > > > > > > > assume
> > > > > > > > > that
> > > > > > > > > one
> > > > > > > > > message exchange has exclusive ownership of the
> > > > 
> > > > underlying
> > > > > > > > > connection.
> > > > > > > > > Multiplexed message exchanges in HTTP/2 and
> > > > > > > > > piplelined
> > > > > > > > > message
> > > > > > > > > exchanges in HTTP/1.1 must not block other concurrent
> > > > > > > > > exchanges.
> > > > > > > > > Message changes however can update their current
> > > > > > > > > capacity
> > > > 
> > > > via
> > > > > > > > > `CapacityChannel`. Reactive extensions is a great
> > > > > > > > > example
> > > > 
> > > > and
> > > > > > > > > also an
> > > > > > > > > alternative to the native APIs per Ryan's
> > > > > > > > > recommendation.
> > > > > > > > > 
> > > > > > > > > If you prefer the native APIs you can take a look at
> > > > > > > > > the
> > > > > > > > > classic
> > > > > > > > > I/O
> > > > > > > > > adaptors that essentially emulate the classic
> > > > > > > > > blocking
> > > > 
> > > > i/o on
> > > > > > > > > top
> > > > > > > > > of
> > > > > > > > > the new async APIs [1] or HTTP/1.1 integration tests
> > > > > > > > > [2]
> > > > 
> > > > that
> > > > > > > > > have a
> > > > > > > > > number of 'slow' consumer / producer test cases.
> > > > > > > > > 
> > > > > > > > > Cheers
> > > > > > > > > 
> > > > > > > > > Oleg
> > > > > > > > > 
> > > > > > > > > [1]
> > > > > > > > > 
> > > > > > 
> > > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > > > > > [2]
> > > > > > > > > 
> > > > > > 
> > > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > > > > > roy.hashimoto@gmail.com
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > wrote:
> > > > > > > > > > 
> > > > > > > > > > > I'm playing with asynchronous handlers in
> > > > > > > > > > > HttpCore 5,
> > > > 
> > > > and
> > > > > > > > > > > I'd
> > > > > > > > > > > like
> > > > > > > > > > > to have
> > > > > > > > > > > an AsyncEntityProducer write data at its own
> > > > > > > > > > > (slow)
> > > > 
> > > > rate
> > > > > > > > > > > like
> > > > > > > > > > > in
> > > > > > > > > > > this old
> > > > > > > > > > > thread <
> > > > > > > > > > > 
> > > > > > 
> > > > > > 
> > > > 
> > > > 
https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > > > > > .
> > > > > > > > > > > 
> > > > > > > > > > > Writing to the DataStreamChannel whenever I want
> > > > > > > > > > > -
> > > > > > > > > > > outside
> > > > > > > > > > > the
> > > > > > > > > > > scope of a
> > > > > > > > > > > produce() method call - works fine, but I notice
> > > > > > > > > > > that
> > > > > > > > > > > produce() is
> > > > > > > > > > > being
> > > > > > > > > > > called every 5-6 milliseconds which ideally I
> > > > > > > > > > > would
> > > > 
> > > > like
> > > > > > > > > > > to
> > > > > > > > > > > eliminate or
> > > > > > > > > > > reduce.
> > > > > > > > > > > 
> > > > > > > > > > > The answer in the old thread was to use
> > > > > > > > > > > IOControl.suspendOutput()
> > > > > > > > > > > and
> > > > > > > > > > > IOControl.requestOutput(), but this class appears
> > > > > > > > > > > no
> > > > > > > > > > > longer
> > > > > > > > > > > to be
> > > > > > > > > > > in
> > > > > > > > > > > HttpCore 5. I see that there is a
> > > > > > > > > > > DataStreamChannel.requestOutput()
> > > > > > > > > > > but I
> > > > > > > > > > > haven't figured out what suspension call that
> > > > > > > > > > > should
> > > > 
> > > > be
> > > > > > > > > > > paired
> > > > > > > > > > > with. I have
> > > > > > > > > > > tried simply returning 0 from my
> > > > > > > > > > > AsyncEntityProducer.available()
> > > > > > > > > > > override,
> > > > > > > > > > > but that doesn't seem to be it.
> > > > > > > > > > > 
> > > > > > > > > > > Is there a new way to suspend/resume output in
> > > > 
> > > > HttpCore
> > > > > > > > > > > 5?
> > > > > > > > > > > 
> > > > > > > > > > > Thanks!
> > > > > > > > > > > Roy
> > > > > > > > > > > 
> > > > > > > > > > > Kotlin source here
> > > > > > > > > > > <
> > > > > > > > > > > 
> > > > > > 
> > > > > > 
> > > > 
> > > > 
https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > > > > > .
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > ---------------------------------------------------
> > > > > > > > > ----
> > > > 
> > > > ----
> > > > > > > > > ----
> > > > > > > > > ------
> > > > > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > > > > For additional commands, e-mail: 
> > > > > > > > > dev-help@hc.apache.org
> > > > > > > > > 
> > > > > > > > > 
> > > > > > 
> > > > > > 
> > > > > > ---------------------------------------------------------
> > > > > > ----
> > > > 
> > > > ----
> > > > > > ----
> > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > > 
> > > > > > 
> > > > 
> > > > 
> > > > -------------------------------------------------------------
> > > > ----
> > > > ----
> > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > 
> > > 
> > > ---------------------------------------------------------------
> > > ------
> > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > For additional commands, e-mail: dev-help@hc.apache.org
> > 
> > 
> > -----------------------------------------------------------------
> > ----
> > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > For additional commands, e-mail: dev-help@hc.apache.org
> > 
> > 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Roy Hashimoto <ro...@gmail.com>.
I have observed it clearing the bit. I have seen the blocking read here
<https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L271>
happen when the Selector interestOps is 0, so the Selector does not seem to
be gating the read.

Roy

On Mon, Sep 9, 2019 at 11:37 AM Oleg Kalnichevski <ol...@apache.org> wrote:

> On Mon, 2019-09-09 at 11:01 -0700, Roy Hashimoto wrote:
> > If this is a real issue, I believe it happens with
> > AbstractHttp1StreamDuplexer when consuming entities on both server
> > and client. Here is a sample server program to investigate the
> > behavior. It would probably be easier to test with a client program
> > but I've been using the server API so that was quicker for me to
> > write.
> >
> > The sample is pretty minimal. All I've done is set the HTTP1 initial
> > window size to 8192 and add an AsyncExchangeHandler that creates an
> > empty response and logs the AsyncDataConsumer method implementations
> > to the console. Note that I intentionally never update the
> > CapacityChannel - I have effectively made an infinitely slow
> > consumer.
> >
> > To test, post data of more than 8 KB to the server (e.g. using curl).
> > I expect to see in the log consume() calls totaling at least 8192 and
> > nothing after that until the socket times out. Instead, what I see is
> > consume() calls delivering all the data and then streamEnd().
> >
> > Here's sample output for uploading almost 32 KB. I have used my IDE
> > to add a logging breakpoint to show the capacity window on this line,
> > so those are also included in the output below (in green if you're
> > viewing HTML mail).
> >
> > Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE:
> > Listening on /0:0:0:0:0:0:0:0:8080
> > Mon Sep 09, 2019 10:27:31.294 AM SlowConsumerTest$MyExchangeHandler
> > handleRequest FINEST: handleRequest called
> > Mon Sep 09, 2019 10:27:31.332 AM SlowConsumerTest$MyExchangeHandler
> > consume FINEST: consume 8192 total 8192
> > AbstractHttp1StreamDuplexer:313 capacity=0
> > Mon Sep 09, 2019 10:27:31.341 AM SlowConsumerTest$MyExchangeHandler
> > updateCapacity FINEST: updateCapacity called
> > Mon Sep 09, 2019 10:27:31.342 AM SlowConsumerTest$MyExchangeHandler
> > consume FINEST: consume 8192 total 16384
> > AbstractHttp1StreamDuplexer:313 capacity=-8192
> > Mon Sep 09, 2019 10:27:31.346 AM SlowConsumerTest$MyExchangeHandler
> > updateCapacity FINEST: updateCapacity called
> > Mon Sep 09, 2019 10:27:31.348 AM SlowConsumerTest$MyExchangeHandler
> > consume FINEST: consume 8192 total 24576
> > AbstractHttp1StreamDuplexer:313 capacity=-16384
> > Mon Sep 09, 2019 10:27:31.352 AM SlowConsumerTest$MyExchangeHandler
> > updateCapacity FINEST: updateCapacity called
> > Mon Sep 09, 2019 10:27:31.354 AM SlowConsumerTest$MyExchangeHandler
> > consume FINEST: consume 8150 total 32726
> > AbstractHttp1StreamDuplexer:313 capacity=-24534
> > Mon Sep 09, 2019 10:27:31.357 AM SlowConsumerTest$MyExchangeHandler
> > streamEnd FINEST: streamEnd called
> >
> > My expectation is that there would no calls to consume() once
> > capacity reached 0 and we would never see streamEnd().
> >
>
> Roy
>
> My expectations would be the same. The protocol handler should be
> clearing read interest if the capacity window drops below zero, see:
>
>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L609
>
> I will investigate why it does not happen.
>
> Oleg
>
> > Roy
> >
> > On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <ol...@apache.org>
> > wrote:
> > > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > > > >
> > > > > >
> > > > >
> > > > >
> > > client.start(Http1Config.custom().setBufferSize(256).setInitialWind
> > > > > owSize(32).build());
> > > > > >
> > > > >
> > > > > This change alone will have little effect without adjusting the
> > > > > session
> > > > > buffer side as well. Given that the default session buffer size
> > > is
> > > > > 8192
> > > > > with 32 byte initial capacity window one is likely to get
> > > multiple
> > > > > #consume invocations with negative capacity.
> > > >
> > > >
> > > > Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I
> > > > increase
> > > > the amount of request data in the test to more than 8 KB:
> > > >
> > > >   return new MultiLineResponseHandler("0123456789abcd", 1000);
> > > >
> > > > Then I can see the capacity window go far more negative than
> > > -8192,
> > > > which I
> > > > wouldn't expect with your explanation.
> > > >
> > > > It looks like AbstractHttp1StreamDuplexer reads its data with a
> > > > blocking
> > > > read without explicitly waiting on the NIO Selector configured by
> > > > CapacityChannel (unless I missed it in the code). Does this type
> > > of
> > > > read
> > > > implicitly use the Selector? It doesn't appear to me that it
> > > does.
> > > > For
> > > > example, I can reach this line
> > > > <
> > > >
> > >
> https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> > > > >
> > > > right
> > > > after the blocking read() when session.ioSession.key.interestOps
> > > is
> > > > 0.
> > > >
> > > > Don't waste time on this if I'm not making sense. I'm not that
> > > > familiar
> > > > with the NIO API, and I don't have a use case that depends on
> > > this.
> > > > Just
> > > > wanted to bring it to someone's attention in case it indicated an
> > > > issue.
> > > >
> > > > Roy
> > >
> > > Roy
> > >
> > > I am not sure we are talking about the same things because it do
> > > not
> > > even understand if you are talking about the server side, client
> > > side
> > > or both. Can you put together a test application that reproduces
> > > the
> > > issue and describe what think is wrong?
> > >
> > > Oleg
> > >
> > > >
> > > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <
> > > olegk@apache.org>
> > > > wrote:
> > > >
> > > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > > > > By looking at the suggested 5.0 examples I was able to get an
> > > > > > AsyncServerExchangeHandler subclass to play nicely with
> > > Kotlin
> > > > > > coroutines
> > > > > > on the AsyncDataProducer side of things, i.e. minimizing
> > > > > > produce()
> > > > > > polling
> > > > > > and avoiding buffering.
> > > > > >
> > > > > > I haven't been as successful with throttling calls on the
> > > > > > AsyncDataConsumer
> > > > > > side, i.e. consume() calls keep being made even though the
> > > > > > capacity
> > > > > > window
> > > > > > has gone negative. I think this might be the expected
> > > behavior
> > > > > > because of
> > > > > > this comment in AbstractHttp1StreamDuplexer:
> > > > > >
> > > > > >         // At present the consumer can be forced to consume
> > > data
> > > > > >         // over its declared capacity in order to avoid
> > > having
> > > > > >         // unprocessed message body content stuck in the
> > > session
> > > > > >         // input buffer
> > > > > >
> > > > > > Does that refer to just the case where the capacity starts
> > > > > > positive
> > > > > > but
> > > > > > data exceeding the capacity is delivered to consume()? Or
> > > does it
> > > > > > refer to
> > > > > > the behavior I see, which is that capacity updates (or the
> > > lack
> > > > > > of
> > > > > > them)
> > > > > > don't seem to have any effect for HTTP/1.1?
> > > > > >
> > > > >
> > > > > It is the former. Whatever data read from the underlying socket
> > > > > channel
> > > > > into the session buffer will be force-fed into the consumer
> > > > > regardless
> > > > > of its declared capacity for HTTP/1.1 connections.
> > > > >
> > > > > > I've also tried running the
> > > > > > Http1IntegrationTest.testSlowResponseConsumer()
> > > > > > test, substituting this line to trigger updateCapacity()
> > > calls:
> > > > > >
> > > > > >
> > > > > >
> > > client.start(Http1Config.custom().setBufferSize(256).setInitialWi
> > > > > > ndow
> > > > > > Size(32).build());
> > > > > >
> > > > >
> > > > > This change alone will have little effect without adjusting the
> > > > > session
> > > > > buffer side as well. Given that the default session buffer size
> > > is
> > > > > 8192
> > > > > with 32 byte initial capacity window one is likely to get
> > > multiple
> > > > > #consume invocations with negative capacity.
> > > > >
> > > > >
> > > > > > By adding the small initial window, I can see the
> > > capacityWindow
> > > > > > going more
> > > > > > and more negative on each consume() call with all the data
> > > > > > buffered
> > > > > > before
> > > > > > the test code completes its first sleep.
> > > > > >
> > > > > > I don't have a specific use case for a slow consumer, just
> > > want
> > > > > > to
> > > > > > know if
> > > > > > I'm misunderstanding something.
> > > > > >
> > > > >
> > > > > I realized that we likely need to adjust the session buffer
> > > size
> > > > > automatically when the initial window setting is below than
> > > value.
> > > > >
> > > > > Hope this helps
> > > > >
> > > > > Oleg
> > > > >
> > > > > > Thanks!
> > > > > > Roy
> > > > > >
> > > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > > > > roy.hashimoto@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Those are good leads, I'll pursue them.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Roy
> > > > > > >
> > > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > > > > olegk@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > > > > > > > > Have you looked at the reactive extensions for
> > > HttpCore5?
> > > > > > > > > They
> > > > > > > > > demonstrate
> > > > > > > > > how to implement AsyncEntityProducer/AsyncDataProducer
> > > with
> > > > > > > > > support
> > > > > > > > > for
> > > > > > > > > backpressure (or you can just use the Reactive Streams
> > > API
> > > > > > > > > instead):
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > >
> > > > >
> > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > Just a bit of background. In 5.0 one can no longer assume
> > > > > > > > that
> > > > > > > > one
> > > > > > > > message exchange has exclusive ownership of the
> > > underlying
> > > > > > > > connection.
> > > > > > > > Multiplexed message exchanges in HTTP/2 and piplelined
> > > > > > > > message
> > > > > > > > exchanges in HTTP/1.1 must not block other concurrent
> > > > > > > > exchanges.
> > > > > > > > Message changes however can update their current capacity
> > > via
> > > > > > > > `CapacityChannel`. Reactive extensions is a great example
> > > and
> > > > > > > > also an
> > > > > > > > alternative to the native APIs per Ryan's recommendation.
> > > > > > > >
> > > > > > > > If you prefer the native APIs you can take a look at the
> > > > > > > > classic
> > > > > > > > I/O
> > > > > > > > adaptors that essentially emulate the classic blocking
> > > i/o on
> > > > > > > > top
> > > > > > > > of
> > > > > > > > the new async APIs [1] or HTTP/1.1 integration tests [2]
> > > that
> > > > > > > > have a
> > > > > > > > number of 'slow' consumer / producer test cases.
> > > > > > > >
> > > > > > > > Cheers
> > > > > > > >
> > > > > > > > Oleg
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > >
> > > > >
> > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > > > > [2]
> > > > > > > >
> > > > >
> > > > >
> > >
>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > > > >
> > > > > > > >
> > > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > > > > roy.hashimoto@gmail.com
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I'm playing with asynchronous handlers in HttpCore 5,
> > > and
> > > > > > > > > > I'd
> > > > > > > > > > like
> > > > > > > > > > to have
> > > > > > > > > > an AsyncEntityProducer write data at its own (slow)
> > > rate
> > > > > > > > > > like
> > > > > > > > > > in
> > > > > > > > > > this old
> > > > > > > > > > thread <
> > > > > > > > > >
> > > > >
> > > > >
> > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > > > > .
> > > > > > > > > >
> > > > > > > > > > Writing to the DataStreamChannel whenever I want -
> > > > > > > > > > outside
> > > > > > > > > > the
> > > > > > > > > > scope of a
> > > > > > > > > > produce() method call - works fine, but I notice that
> > > > > > > > > > produce() is
> > > > > > > > > > being
> > > > > > > > > > called every 5-6 milliseconds which ideally I would
> > > like
> > > > > > > > > > to
> > > > > > > > > > eliminate or
> > > > > > > > > > reduce.
> > > > > > > > > >
> > > > > > > > > > The answer in the old thread was to use
> > > > > > > > > > IOControl.suspendOutput()
> > > > > > > > > > and
> > > > > > > > > > IOControl.requestOutput(), but this class appears no
> > > > > > > > > > longer
> > > > > > > > > > to be
> > > > > > > > > > in
> > > > > > > > > > HttpCore 5. I see that there is a
> > > > > > > > > > DataStreamChannel.requestOutput()
> > > > > > > > > > but I
> > > > > > > > > > haven't figured out what suspension call that should
> > > be
> > > > > > > > > > paired
> > > > > > > > > > with. I have
> > > > > > > > > > tried simply returning 0 from my
> > > > > > > > > > AsyncEntityProducer.available()
> > > > > > > > > > override,
> > > > > > > > > > but that doesn't seem to be it.
> > > > > > > > > >
> > > > > > > > > > Is there a new way to suspend/resume output in
> > > HttpCore
> > > > > > > > > > 5?
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > Roy
> > > > > > > > > >
> > > > > > > > > > Kotlin source here
> > > > > > > > > > <
> > > > > > > > > >
> > > > >
> > > > >
> > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > > > > .
> > > > > > > >
> > > > > > > >
> > > > > > > > -------------------------------------------------------
> > > ----
> > > > > > > > ----
> > > > > > > > ------
> > > > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > > > >
> > > > > > > >
> > > > >
> > > > >
> > > > > -------------------------------------------------------------
> > > ----
> > > > > ----
> > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > >
> > > > >
> > >
> > >
> > > -----------------------------------------------------------------
> > > ----
> > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > For additional commands, e-mail: dev-help@hc.apache.org
> > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > For additional commands, e-mail: dev-help@hc.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> For additional commands, e-mail: dev-help@hc.apache.org
>
>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Mon, 2019-09-09 at 11:01 -0700, Roy Hashimoto wrote:
> If this is a real issue, I believe it happens with
> AbstractHttp1StreamDuplexer when consuming entities on both server
> and client. Here is a sample server program to investigate the
> behavior. It would probably be easier to test with a client program
> but I've been using the server API so that was quicker for me to
> write.
> 
> The sample is pretty minimal. All I've done is set the HTTP1 initial
> window size to 8192 and add an AsyncExchangeHandler that creates an
> empty response and logs the AsyncDataConsumer method implementations
> to the console. Note that I intentionally never update the
> CapacityChannel - I have effectively made an infinitely slow
> consumer.
> 
> To test, post data of more than 8 KB to the server (e.g. using curl).
> I expect to see in the log consume() calls totaling at least 8192 and
> nothing after that until the socket times out. Instead, what I see is
> consume() calls delivering all the data and then streamEnd().
> 
> Here's sample output for uploading almost 32 KB. I have used my IDE
> to add a logging breakpoint to show the capacity window on this line,
> so those are also included in the output below (in green if you're
> viewing HTML mail).
> 
> Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE:
> Listening on /0:0:0:0:0:0:0:0:8080
> Mon Sep 09, 2019 10:27:31.294 AM SlowConsumerTest$MyExchangeHandler
> handleRequest FINEST: handleRequest called
> Mon Sep 09, 2019 10:27:31.332 AM SlowConsumerTest$MyExchangeHandler
> consume FINEST: consume 8192 total 8192
> AbstractHttp1StreamDuplexer:313 capacity=0
> Mon Sep 09, 2019 10:27:31.341 AM SlowConsumerTest$MyExchangeHandler
> updateCapacity FINEST: updateCapacity called
> Mon Sep 09, 2019 10:27:31.342 AM SlowConsumerTest$MyExchangeHandler
> consume FINEST: consume 8192 total 16384
> AbstractHttp1StreamDuplexer:313 capacity=-8192
> Mon Sep 09, 2019 10:27:31.346 AM SlowConsumerTest$MyExchangeHandler
> updateCapacity FINEST: updateCapacity called
> Mon Sep 09, 2019 10:27:31.348 AM SlowConsumerTest$MyExchangeHandler
> consume FINEST: consume 8192 total 24576
> AbstractHttp1StreamDuplexer:313 capacity=-16384
> Mon Sep 09, 2019 10:27:31.352 AM SlowConsumerTest$MyExchangeHandler
> updateCapacity FINEST: updateCapacity called
> Mon Sep 09, 2019 10:27:31.354 AM SlowConsumerTest$MyExchangeHandler
> consume FINEST: consume 8150 total 32726
> AbstractHttp1StreamDuplexer:313 capacity=-24534
> Mon Sep 09, 2019 10:27:31.357 AM SlowConsumerTest$MyExchangeHandler
> streamEnd FINEST: streamEnd called
> 
> My expectation is that there would no calls to consume() once
> capacity reached 0 and we would never see streamEnd().
> 

Roy

My expectations would be the same. The protocol handler should be
clearing read interest if the capacity window drops below zero, see:

https://github.com/apache/httpcomponents-core/blob/master/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L609

I will investigate why it does not happen.

Oleg

> Roy
> 
> On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <ol...@apache.org>
> wrote:
> > On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > > > 
> > > > > 
> > > > 
> > > >
> > client.start(Http1Config.custom().setBufferSize(256).setInitialWind
> > > > owSize(32).build());
> > > > > 
> > > > 
> > > > This change alone will have little effect without adjusting the
> > > > session
> > > > buffer side as well. Given that the default session buffer size
> > is
> > > > 8192
> > > > with 32 byte initial capacity window one is likely to get
> > multiple
> > > > #consume invocations with negative capacity.
> > > 
> > > 
> > > Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I
> > > increase
> > > the amount of request data in the test to more than 8 KB:
> > > 
> > >   return new MultiLineResponseHandler("0123456789abcd", 1000);
> > > 
> > > Then I can see the capacity window go far more negative than
> > -8192,
> > > which I
> > > wouldn't expect with your explanation.
> > > 
> > > It looks like AbstractHttp1StreamDuplexer reads its data with a
> > > blocking
> > > read without explicitly waiting on the NIO Selector configured by
> > > CapacityChannel (unless I missed it in the code). Does this type
> > of
> > > read
> > > implicitly use the Selector? It doesn't appear to me that it
> > does.
> > > For
> > > example, I can reach this line
> > > <
> > > 
> > https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> > > >
> > > right
> > > after the blocking read() when session.ioSession.key.interestOps
> > is
> > > 0.
> > > 
> > > Don't waste time on this if I'm not making sense. I'm not that
> > > familiar
> > > with the NIO API, and I don't have a use case that depends on
> > this.
> > > Just
> > > wanted to bring it to someone's attention in case it indicated an
> > > issue.
> > > 
> > > Roy
> > 
> > Roy
> > 
> > I am not sure we are talking about the same things because it do
> > not
> > even understand if you are talking about the server side, client
> > side
> > or both. Can you put together a test application that reproduces
> > the
> > issue and describe what think is wrong?
> > 
> > Oleg
> > 
> > > 
> > > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <
> > olegk@apache.org>
> > > wrote:
> > > 
> > > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > > > By looking at the suggested 5.0 examples I was able to get an
> > > > > AsyncServerExchangeHandler subclass to play nicely with
> > Kotlin
> > > > > coroutines
> > > > > on the AsyncDataProducer side of things, i.e. minimizing
> > > > > produce()
> > > > > polling
> > > > > and avoiding buffering.
> > > > > 
> > > > > I haven't been as successful with throttling calls on the
> > > > > AsyncDataConsumer
> > > > > side, i.e. consume() calls keep being made even though the
> > > > > capacity
> > > > > window
> > > > > has gone negative. I think this might be the expected
> > behavior
> > > > > because of
> > > > > this comment in AbstractHttp1StreamDuplexer:
> > > > > 
> > > > >         // At present the consumer can be forced to consume
> > data
> > > > >         // over its declared capacity in order to avoid
> > having
> > > > >         // unprocessed message body content stuck in the
> > session
> > > > >         // input buffer
> > > > > 
> > > > > Does that refer to just the case where the capacity starts
> > > > > positive
> > > > > but
> > > > > data exceeding the capacity is delivered to consume()? Or
> > does it
> > > > > refer to
> > > > > the behavior I see, which is that capacity updates (or the
> > lack
> > > > > of
> > > > > them)
> > > > > don't seem to have any effect for HTTP/1.1?
> > > > > 
> > > > 
> > > > It is the former. Whatever data read from the underlying socket
> > > > channel
> > > > into the session buffer will be force-fed into the consumer
> > > > regardless
> > > > of its declared capacity for HTTP/1.1 connections.
> > > > 
> > > > > I've also tried running the
> > > > > Http1IntegrationTest.testSlowResponseConsumer()
> > > > > test, substituting this line to trigger updateCapacity()
> > calls:
> > > > > 
> > > > > 
> > > > >
> > client.start(Http1Config.custom().setBufferSize(256).setInitialWi
> > > > > ndow
> > > > > Size(32).build());
> > > > > 
> > > > 
> > > > This change alone will have little effect without adjusting the
> > > > session
> > > > buffer side as well. Given that the default session buffer size
> > is
> > > > 8192
> > > > with 32 byte initial capacity window one is likely to get
> > multiple
> > > > #consume invocations with negative capacity.
> > > > 
> > > > 
> > > > > By adding the small initial window, I can see the
> > capacityWindow
> > > > > going more
> > > > > and more negative on each consume() call with all the data
> > > > > buffered
> > > > > before
> > > > > the test code completes its first sleep.
> > > > > 
> > > > > I don't have a specific use case for a slow consumer, just
> > want
> > > > > to
> > > > > know if
> > > > > I'm misunderstanding something.
> > > > > 
> > > > 
> > > > I realized that we likely need to adjust the session buffer
> > size
> > > > automatically when the initial window setting is below than
> > value.
> > > > 
> > > > Hope this helps
> > > > 
> > > > Oleg
> > > > 
> > > > > Thanks!
> > > > > Roy
> > > > > 
> > > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > > > roy.hashimoto@gmail.com>
> > > > > wrote:
> > > > > 
> > > > > > Those are good leads, I'll pursue them.
> > > > > > 
> > > > > > Thanks!
> > > > > > Roy
> > > > > > 
> > > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > > > olegk@apache.org>
> > > > > > wrote:
> > > > > > 
> > > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > > > > > > > Have you looked at the reactive extensions for
> > HttpCore5?
> > > > > > > > They
> > > > > > > > demonstrate
> > > > > > > > how to implement AsyncEntityProducer/AsyncDataProducer
> > with
> > > > > > > > support
> > > > > > > > for
> > > > > > > > backpressure (or you can just use the Reactive Streams
> > API
> > > > > > > > instead):
> > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > 
> > > > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > > Just a bit of background. In 5.0 one can no longer assume
> > > > > > > that
> > > > > > > one
> > > > > > > message exchange has exclusive ownership of the
> > underlying
> > > > > > > connection.
> > > > > > > Multiplexed message exchanges in HTTP/2 and piplelined
> > > > > > > message
> > > > > > > exchanges in HTTP/1.1 must not block other concurrent
> > > > > > > exchanges.
> > > > > > > Message changes however can update their current capacity
> > via
> > > > > > > `CapacityChannel`. Reactive extensions is a great example
> > and
> > > > > > > also an
> > > > > > > alternative to the native APIs per Ryan's recommendation.
> > > > > > > 
> > > > > > > If you prefer the native APIs you can take a look at the
> > > > > > > classic
> > > > > > > I/O
> > > > > > > adaptors that essentially emulate the classic blocking
> > i/o on
> > > > > > > top
> > > > > > > of
> > > > > > > the new async APIs [1] or HTTP/1.1 integration tests [2]
> > that
> > > > > > > have a
> > > > > > > number of 'slow' consumer / producer test cases.
> > > > > > > 
> > > > > > > Cheers
> > > > > > > 
> > > > > > > Oleg
> > > > > > > 
> > > > > > > [1]
> > > > > > > 
> > > > 
> > > > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > > > [2]
> > > > > > > 
> > > > 
> > > > 
> > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > > > 
> > > > > > > 
> > > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > > > roy.hashimoto@gmail.com
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > wrote:
> > > > > > > > 
> > > > > > > > > I'm playing with asynchronous handlers in HttpCore 5,
> > and
> > > > > > > > > I'd
> > > > > > > > > like
> > > > > > > > > to have
> > > > > > > > > an AsyncEntityProducer write data at its own (slow)
> > rate
> > > > > > > > > like
> > > > > > > > > in
> > > > > > > > > this old
> > > > > > > > > thread <
> > > > > > > > > 
> > > > 
> > > > 
> > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > > > .
> > > > > > > > > 
> > > > > > > > > Writing to the DataStreamChannel whenever I want -
> > > > > > > > > outside
> > > > > > > > > the
> > > > > > > > > scope of a
> > > > > > > > > produce() method call - works fine, but I notice that
> > > > > > > > > produce() is
> > > > > > > > > being
> > > > > > > > > called every 5-6 milliseconds which ideally I would
> > like
> > > > > > > > > to
> > > > > > > > > eliminate or
> > > > > > > > > reduce.
> > > > > > > > > 
> > > > > > > > > The answer in the old thread was to use
> > > > > > > > > IOControl.suspendOutput()
> > > > > > > > > and
> > > > > > > > > IOControl.requestOutput(), but this class appears no
> > > > > > > > > longer
> > > > > > > > > to be
> > > > > > > > > in
> > > > > > > > > HttpCore 5. I see that there is a
> > > > > > > > > DataStreamChannel.requestOutput()
> > > > > > > > > but I
> > > > > > > > > haven't figured out what suspension call that should
> > be
> > > > > > > > > paired
> > > > > > > > > with. I have
> > > > > > > > > tried simply returning 0 from my
> > > > > > > > > AsyncEntityProducer.available()
> > > > > > > > > override,
> > > > > > > > > but that doesn't seem to be it.
> > > > > > > > > 
> > > > > > > > > Is there a new way to suspend/resume output in
> > HttpCore
> > > > > > > > > 5?
> > > > > > > > > 
> > > > > > > > > Thanks!
> > > > > > > > > Roy
> > > > > > > > > 
> > > > > > > > > Kotlin source here
> > > > > > > > > <
> > > > > > > > > 
> > > > 
> > > > 
> > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > > > .
> > > > > > > 
> > > > > > > 
> > > > > > > -------------------------------------------------------
> > ----
> > > > > > > ----
> > > > > > > ------
> > > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > > > 
> > > > > > > 
> > > > 
> > > > 
> > > > -------------------------------------------------------------
> > ----
> > > > ----
> > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > 
> > > > 
> > 
> > 
> > -----------------------------------------------------------------
> > ----
> > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > For additional commands, e-mail: dev-help@hc.apache.org
> > 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> For additional commands, e-mail: dev-help@hc.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Roy Hashimoto <ro...@gmail.com>.
If this is a real issue, I believe it happens with
AbstractHttp1StreamDuplexer when consuming entities on both server and
client. Here is a sample server program
<https://gist.github.com/rhashimoto/76e4177446f1bb554659d733ef16b9be> to
investigate the behavior. It would probably be easier to test with a client
program but I've been using the server API so that was quicker for me to
write.

The sample is pretty minimal. All I've done is set the HTTP1 initial window
size to 8192 and add an AsyncExchangeHandler that creates an empty response
and logs the AsyncDataConsumer method implementations to the console. Note
that *I intentionally never update the CapacityChannel* - I have
effectively made an infinitely slow consumer.

To test, post data of more than 8 KB to the server (e.g. using curl). I
expect to see in the log consume() calls totaling at least 8192 and nothing
after that until the socket times out. Instead, what I see is consume()
calls delivering all the data and then streamEnd().

Here's sample output for uploading almost 32 KB. I have used my IDE to add
a logging breakpoint to show the capacity window on this line
<https://github.com/apache/httpcomponents-core/blob/a60528ea58877d55dab266bd2813e065aac6ff2c/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java#L313>,
so those are also included in the output below (in green if you're viewing
HTML mail).

Mon Sep 09, 2019 10:27:18.893 AM SlowConsumerTest main FINE: Listening on
/0:0:0:0:0:0:0:0:8080
Mon Sep 09, 2019 10:27:31.294 AM SlowConsumerTest$MyExchangeHandler
handleRequest FINEST: handleRequest called
Mon Sep 09, 2019 10:27:31.332 AM SlowConsumerTest$MyExchangeHandler consume
FINEST: consume 8192 total 8192
AbstractHttp1StreamDuplexer:313 capacity=0
Mon Sep 09, 2019 10:27:31.341 AM SlowConsumerTest$MyExchangeHandler
updateCapacity FINEST: updateCapacity called
Mon Sep 09, 2019 10:27:31.342 AM SlowConsumerTest$MyExchangeHandler consume
FINEST: consume 8192 total 16384
AbstractHttp1StreamDuplexer:313 capacity=-8192
Mon Sep 09, 2019 10:27:31.346 AM SlowConsumerTest$MyExchangeHandler
updateCapacity FINEST: updateCapacity called
Mon Sep 09, 2019 10:27:31.348 AM SlowConsumerTest$MyExchangeHandler consume
FINEST: consume 8192 total 24576
AbstractHttp1StreamDuplexer:313 capacity=-16384
Mon Sep 09, 2019 10:27:31.352 AM SlowConsumerTest$MyExchangeHandler
updateCapacity FINEST: updateCapacity called
Mon Sep 09, 2019 10:27:31.354 AM SlowConsumerTest$MyExchangeHandler consume
FINEST: consume 8150 total 32726
AbstractHttp1StreamDuplexer:313 capacity=-24534
Mon Sep 09, 2019 10:27:31.357 AM SlowConsumerTest$MyExchangeHandler
streamEnd FINEST: streamEnd called

My expectation is that there would no calls to consume() once capacity
reached 0 and we would never see streamEnd().

Roy

On Mon, Sep 9, 2019 at 8:10 AM Oleg Kalnichevski <ol...@apache.org> wrote:

> On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > >
> > > >
> > >
> > > client.start(Http1Config.custom().setBufferSize(256).setInitialWind
> > > owSize(32).build());
> > > >
> > >
> > > This change alone will have little effect without adjusting the
> > > session
> > > buffer side as well. Given that the default session buffer size is
> > > 8192
> > > with 32 byte initial capacity window one is likely to get multiple
> > > #consume invocations with negative capacity.
> >
> >
> > Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I
> > increase
> > the amount of request data in the test to more than 8 KB:
> >
> >   return new MultiLineResponseHandler("0123456789abcd", 1000);
> >
> > Then I can see the capacity window go far more negative than -8192,
> > which I
> > wouldn't expect with your explanation.
> >
> > It looks like AbstractHttp1StreamDuplexer reads its data with a
> > blocking
> > read without explicitly waiting on the NIO Selector configured by
> > CapacityChannel (unless I missed it in the code). Does this type of
> > read
> > implicitly use the Selector? It doesn't appear to me that it does.
> > For
> > example, I can reach this line
> > <
> >
> https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> > >
> > right
> > after the blocking read() when session.ioSession.key.interestOps is
> > 0.
> >
> > Don't waste time on this if I'm not making sense. I'm not that
> > familiar
> > with the NIO API, and I don't have a use case that depends on this.
> > Just
> > wanted to bring it to someone's attention in case it indicated an
> > issue.
> >
> > Roy
>
> Roy
>
> I am not sure we are talking about the same things because it do not
> even understand if you are talking about the server side, client side
> or both. Can you put together a test application that reproduces the
> issue and describe what think is wrong?
>
> Oleg
>
> >
> > On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <ol...@apache.org>
> > wrote:
> >
> > > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > > By looking at the suggested 5.0 examples I was able to get an
> > > > AsyncServerExchangeHandler subclass to play nicely with Kotlin
> > > > coroutines
> > > > on the AsyncDataProducer side of things, i.e. minimizing
> > > > produce()
> > > > polling
> > > > and avoiding buffering.
> > > >
> > > > I haven't been as successful with throttling calls on the
> > > > AsyncDataConsumer
> > > > side, i.e. consume() calls keep being made even though the
> > > > capacity
> > > > window
> > > > has gone negative. I think this might be the expected behavior
> > > > because of
> > > > this comment in AbstractHttp1StreamDuplexer:
> > > >
> > > >         // At present the consumer can be forced to consume data
> > > >         // over its declared capacity in order to avoid having
> > > >         // unprocessed message body content stuck in the session
> > > >         // input buffer
> > > >
> > > > Does that refer to just the case where the capacity starts
> > > > positive
> > > > but
> > > > data exceeding the capacity is delivered to consume()? Or does it
> > > > refer to
> > > > the behavior I see, which is that capacity updates (or the lack
> > > > of
> > > > them)
> > > > don't seem to have any effect for HTTP/1.1?
> > > >
> > >
> > > It is the former. Whatever data read from the underlying socket
> > > channel
> > > into the session buffer will be force-fed into the consumer
> > > regardless
> > > of its declared capacity for HTTP/1.1 connections.
> > >
> > > > I've also tried running the
> > > > Http1IntegrationTest.testSlowResponseConsumer()
> > > > test, substituting this line to trigger updateCapacity() calls:
> > > >
> > > >
> > > > client.start(Http1Config.custom().setBufferSize(256).setInitialWi
> > > > ndow
> > > > Size(32).build());
> > > >
> > >
> > > This change alone will have little effect without adjusting the
> > > session
> > > buffer side as well. Given that the default session buffer size is
> > > 8192
> > > with 32 byte initial capacity window one is likely to get multiple
> > > #consume invocations with negative capacity.
> > >
> > >
> > > > By adding the small initial window, I can see the capacityWindow
> > > > going more
> > > > and more negative on each consume() call with all the data
> > > > buffered
> > > > before
> > > > the test code completes its first sleep.
> > > >
> > > > I don't have a specific use case for a slow consumer, just want
> > > > to
> > > > know if
> > > > I'm misunderstanding something.
> > > >
> > >
> > > I realized that we likely need to adjust the session buffer size
> > > automatically when the initial window setting is below than value.
> > >
> > > Hope this helps
> > >
> > > Oleg
> > >
> > > > Thanks!
> > > > Roy
> > > >
> > > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > > roy.hashimoto@gmail.com>
> > > > wrote:
> > > >
> > > > > Those are good leads, I'll pursue them.
> > > > >
> > > > > Thanks!
> > > > > Roy
> > > > >
> > > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > > olegk@apache.org>
> > > > > wrote:
> > > > >
> > > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > > > > > > Have you looked at the reactive extensions for HttpCore5?
> > > > > > > They
> > > > > > > demonstrate
> > > > > > > how to implement AsyncEntityProducer/AsyncDataProducer with
> > > > > > > support
> > > > > > > for
> > > > > > > backpressure (or you can just use the Reactive Streams API
> > > > > > > instead):
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > >
> > > > > > >
> > > > > >
> > > > > > Just a bit of background. In 5.0 one can no longer assume
> > > > > > that
> > > > > > one
> > > > > > message exchange has exclusive ownership of the underlying
> > > > > > connection.
> > > > > > Multiplexed message exchanges in HTTP/2 and piplelined
> > > > > > message
> > > > > > exchanges in HTTP/1.1 must not block other concurrent
> > > > > > exchanges.
> > > > > > Message changes however can update their current capacity via
> > > > > > `CapacityChannel`. Reactive extensions is a great example and
> > > > > > also an
> > > > > > alternative to the native APIs per Ryan's recommendation.
> > > > > >
> > > > > > If you prefer the native APIs you can take a look at the
> > > > > > classic
> > > > > > I/O
> > > > > > adaptors that essentially emulate the classic blocking i/o on
> > > > > > top
> > > > > > of
> > > > > > the new async APIs [1] or HTTP/1.1 integration tests [2] that
> > > > > > have a
> > > > > > number of 'slow' consumer / producer test cases.
> > > > > >
> > > > > > Cheers
> > > > > >
> > > > > > Oleg
> > > > > >
> > > > > > [1]
> > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > > [2]
> > > > > >
> > >
> > >
>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > >
> > > > > >
> > > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > > roy.hashimoto@gmail.com
> > > > > > > >
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I'm playing with asynchronous handlers in HttpCore 5, and
> > > > > > > > I'd
> > > > > > > > like
> > > > > > > > to have
> > > > > > > > an AsyncEntityProducer write data at its own (slow) rate
> > > > > > > > like
> > > > > > > > in
> > > > > > > > this old
> > > > > > > > thread <
> > > > > > > >
> > >
> > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > > .
> > > > > > > >
> > > > > > > > Writing to the DataStreamChannel whenever I want -
> > > > > > > > outside
> > > > > > > > the
> > > > > > > > scope of a
> > > > > > > > produce() method call - works fine, but I notice that
> > > > > > > > produce() is
> > > > > > > > being
> > > > > > > > called every 5-6 milliseconds which ideally I would like
> > > > > > > > to
> > > > > > > > eliminate or
> > > > > > > > reduce.
> > > > > > > >
> > > > > > > > The answer in the old thread was to use
> > > > > > > > IOControl.suspendOutput()
> > > > > > > > and
> > > > > > > > IOControl.requestOutput(), but this class appears no
> > > > > > > > longer
> > > > > > > > to be
> > > > > > > > in
> > > > > > > > HttpCore 5. I see that there is a
> > > > > > > > DataStreamChannel.requestOutput()
> > > > > > > > but I
> > > > > > > > haven't figured out what suspension call that should be
> > > > > > > > paired
> > > > > > > > with. I have
> > > > > > > > tried simply returning 0 from my
> > > > > > > > AsyncEntityProducer.available()
> > > > > > > > override,
> > > > > > > > but that doesn't seem to be it.
> > > > > > > >
> > > > > > > > Is there a new way to suspend/resume output in HttpCore
> > > > > > > > 5?
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Roy
> > > > > > > >
> > > > > > > > Kotlin source here
> > > > > > > > <
> > > > > > > >
> > >
> > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > > .
> > > > > >
> > > > > >
> > > > > > -----------------------------------------------------------
> > > > > > ----
> > > > > > ------
> > > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > >
> > > > > >
> > >
> > >
> > > -----------------------------------------------------------------
> > > ----
> > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > For additional commands, e-mail: dev-help@hc.apache.org
> > >
> > >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> For additional commands, e-mail: dev-help@hc.apache.org
>
>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Mon, 2019-09-09 at 07:58 -0700, Roy Hashimoto wrote:
> > 
> > > 
> > 
> > client.start(Http1Config.custom().setBufferSize(256).setInitialWind
> > owSize(32).build());
> > > 
> > 
> > This change alone will have little effect without adjusting the
> > session
> > buffer side as well. Given that the default session buffer size is
> > 8192
> > with 32 byte initial capacity window one is likely to get multiple
> > #consume invocations with negative capacity.
> 
> 
> Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I
> increase
> the amount of request data in the test to more than 8 KB:
> 
>   return new MultiLineResponseHandler("0123456789abcd", 1000);
> 
> Then I can see the capacity window go far more negative than -8192,
> which I
> wouldn't expect with your explanation.
> 
> It looks like AbstractHttp1StreamDuplexer reads its data with a
> blocking
> read without explicitly waiting on the NIO Selector configured by
> CapacityChannel (unless I missed it in the code). Does this type of
> read
> implicitly use the Selector? It doesn't appear to me that it does.
> For
> example, I can reach this line
> <
> https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272
> >
> right
> after the blocking read() when session.ioSession.key.interestOps is
> 0.
> 
> Don't waste time on this if I'm not making sense. I'm not that
> familiar
> with the NIO API, and I don't have a use case that depends on this.
> Just
> wanted to bring it to someone's attention in case it indicated an
> issue.
> 
> Roy

Roy

I am not sure we are talking about the same things because it do not
even understand if you are talking about the server side, client side
or both. Can you put together a test application that reproduces the
issue and describe what think is wrong?

Oleg

> 
> On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <ol...@apache.org>
> wrote:
> 
> > On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > > By looking at the suggested 5.0 examples I was able to get an
> > > AsyncServerExchangeHandler subclass to play nicely with Kotlin
> > > coroutines
> > > on the AsyncDataProducer side of things, i.e. minimizing
> > > produce()
> > > polling
> > > and avoiding buffering.
> > > 
> > > I haven't been as successful with throttling calls on the
> > > AsyncDataConsumer
> > > side, i.e. consume() calls keep being made even though the
> > > capacity
> > > window
> > > has gone negative. I think this might be the expected behavior
> > > because of
> > > this comment in AbstractHttp1StreamDuplexer:
> > > 
> > >         // At present the consumer can be forced to consume data
> > >         // over its declared capacity in order to avoid having
> > >         // unprocessed message body content stuck in the session
> > >         // input buffer
> > > 
> > > Does that refer to just the case where the capacity starts
> > > positive
> > > but
> > > data exceeding the capacity is delivered to consume()? Or does it
> > > refer to
> > > the behavior I see, which is that capacity updates (or the lack
> > > of
> > > them)
> > > don't seem to have any effect for HTTP/1.1?
> > > 
> > 
> > It is the former. Whatever data read from the underlying socket
> > channel
> > into the session buffer will be force-fed into the consumer
> > regardless
> > of its declared capacity for HTTP/1.1 connections.
> > 
> > > I've also tried running the
> > > Http1IntegrationTest.testSlowResponseConsumer()
> > > test, substituting this line to trigger updateCapacity() calls:
> > > 
> > > 
> > > client.start(Http1Config.custom().setBufferSize(256).setInitialWi
> > > ndow
> > > Size(32).build());
> > > 
> > 
> > This change alone will have little effect without adjusting the
> > session
> > buffer side as well. Given that the default session buffer size is
> > 8192
> > with 32 byte initial capacity window one is likely to get multiple
> > #consume invocations with negative capacity.
> > 
> > 
> > > By adding the small initial window, I can see the capacityWindow
> > > going more
> > > and more negative on each consume() call with all the data
> > > buffered
> > > before
> > > the test code completes its first sleep.
> > > 
> > > I don't have a specific use case for a slow consumer, just want
> > > to
> > > know if
> > > I'm misunderstanding something.
> > > 
> > 
> > I realized that we likely need to adjust the session buffer size
> > automatically when the initial window setting is below than value.
> > 
> > Hope this helps
> > 
> > Oleg
> > 
> > > Thanks!
> > > Roy
> > > 
> > > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > > roy.hashimoto@gmail.com>
> > > wrote:
> > > 
> > > > Those are good leads, I'll pursue them.
> > > > 
> > > > Thanks!
> > > > Roy
> > > > 
> > > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <
> > > > olegk@apache.org>
> > > > wrote:
> > > > 
> > > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > > > > > Have you looked at the reactive extensions for HttpCore5?
> > > > > > They
> > > > > > demonstrate
> > > > > > how to implement AsyncEntityProducer/AsyncDataProducer with
> > > > > > support
> > > > > > for
> > > > > > backpressure (or you can just use the Reactive Streams API
> > > > > > instead):
> > > > > > 
> > > > > > 
> > > > > 
> > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > > > 
> > > > > > 
> > > > > 
> > > > > Just a bit of background. In 5.0 one can no longer assume
> > > > > that
> > > > > one
> > > > > message exchange has exclusive ownership of the underlying
> > > > > connection.
> > > > > Multiplexed message exchanges in HTTP/2 and piplelined
> > > > > message
> > > > > exchanges in HTTP/1.1 must not block other concurrent
> > > > > exchanges.
> > > > > Message changes however can update their current capacity via
> > > > > `CapacityChannel`. Reactive extensions is a great example and
> > > > > also an
> > > > > alternative to the native APIs per Ryan's recommendation.
> > > > > 
> > > > > If you prefer the native APIs you can take a look at the
> > > > > classic
> > > > > I/O
> > > > > adaptors that essentially emulate the classic blocking i/o on
> > > > > top
> > > > > of
> > > > > the new async APIs [1] or HTTP/1.1 integration tests [2] that
> > > > > have a
> > > > > number of 'slow' consumer / producer test cases.
> > > > > 
> > > > > Cheers
> > > > > 
> > > > > Oleg
> > > > > 
> > > > > [1]
> > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > > [2]
> > > > > 
> > 
> > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > > > 
> > > > > 
> > > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > > roy.hashimoto@gmail.com
> > > > > > > 
> > > > > > 
> > > > > > wrote:
> > > > > > 
> > > > > > > I'm playing with asynchronous handlers in HttpCore 5, and
> > > > > > > I'd
> > > > > > > like
> > > > > > > to have
> > > > > > > an AsyncEntityProducer write data at its own (slow) rate
> > > > > > > like
> > > > > > > in
> > > > > > > this old
> > > > > > > thread <
> > > > > > > 
> > 
> > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > > .
> > > > > > > 
> > > > > > > Writing to the DataStreamChannel whenever I want -
> > > > > > > outside
> > > > > > > the
> > > > > > > scope of a
> > > > > > > produce() method call - works fine, but I notice that
> > > > > > > produce() is
> > > > > > > being
> > > > > > > called every 5-6 milliseconds which ideally I would like
> > > > > > > to
> > > > > > > eliminate or
> > > > > > > reduce.
> > > > > > > 
> > > > > > > The answer in the old thread was to use
> > > > > > > IOControl.suspendOutput()
> > > > > > > and
> > > > > > > IOControl.requestOutput(), but this class appears no
> > > > > > > longer
> > > > > > > to be
> > > > > > > in
> > > > > > > HttpCore 5. I see that there is a
> > > > > > > DataStreamChannel.requestOutput()
> > > > > > > but I
> > > > > > > haven't figured out what suspension call that should be
> > > > > > > paired
> > > > > > > with. I have
> > > > > > > tried simply returning 0 from my
> > > > > > > AsyncEntityProducer.available()
> > > > > > > override,
> > > > > > > but that doesn't seem to be it.
> > > > > > > 
> > > > > > > Is there a new way to suspend/resume output in HttpCore
> > > > > > > 5?
> > > > > > > 
> > > > > > > Thanks!
> > > > > > > Roy
> > > > > > > 
> > > > > > > Kotlin source here
> > > > > > > <
> > > > > > > 
> > 
> > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > > .
> > > > > 
> > > > > 
> > > > > -----------------------------------------------------------
> > > > > ----
> > > > > ------
> > > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > > > 
> > > > > 
> > 
> > 
> > -----------------------------------------------------------------
> > ----
> > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > For additional commands, e-mail: dev-help@hc.apache.org
> > 
> > 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Roy Hashimoto <ro...@gmail.com>.
>
> >
> client.start(Http1Config.custom().setBufferSize(256).setInitialWindowSize(32).build());
> >
> This change alone will have little effect without adjusting the session
> buffer side as well. Given that the default session buffer size is 8192
> with 32 byte initial capacity window one is likely to get multiple
> #consume invocations with negative capacity.


Hmm. Isn't setBufferSize(256) for the session buffer? Also, if I increase
the amount of request data in the test to more than 8 KB:

  return new MultiLineResponseHandler("0123456789abcd", 1000);

Then I can see the capacity window go far more negative than -8192, which I
wouldn't expect with your explanation.

It looks like AbstractHttp1StreamDuplexer reads its data with a blocking
read without explicitly waiting on the NIO Selector configured by
CapacityChannel (unless I missed it in the code). Does this type of read
implicitly use the Selector? It doesn't appear to me that it does. For
example, I can reach this line
<https://github.com/apache/httpcomponents-core/blob/9a1b26e06e35220fb349de2e8c4197c1ac87dcf9/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java#L272>
right
after the blocking read() when session.ioSession.key.interestOps is 0.

Don't waste time on this if I'm not making sense. I'm not that familiar
with the NIO API, and I don't have a use case that depends on this. Just
wanted to bring it to someone's attention in case it indicated an issue.

Roy

On Mon, Sep 9, 2019 at 2:02 AM Oleg Kalnichevski <ol...@apache.org> wrote:

> On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> > By looking at the suggested 5.0 examples I was able to get an
> > AsyncServerExchangeHandler subclass to play nicely with Kotlin
> > coroutines
> > on the AsyncDataProducer side of things, i.e. minimizing produce()
> > polling
> > and avoiding buffering.
> >
> > I haven't been as successful with throttling calls on the
> > AsyncDataConsumer
> > side, i.e. consume() calls keep being made even though the capacity
> > window
> > has gone negative. I think this might be the expected behavior
> > because of
> > this comment in AbstractHttp1StreamDuplexer:
> >
> >         // At present the consumer can be forced to consume data
> >         // over its declared capacity in order to avoid having
> >         // unprocessed message body content stuck in the session
> >         // input buffer
> >
> > Does that refer to just the case where the capacity starts positive
> > but
> > data exceeding the capacity is delivered to consume()? Or does it
> > refer to
> > the behavior I see, which is that capacity updates (or the lack of
> > them)
> > don't seem to have any effect for HTTP/1.1?
> >
>
> It is the former. Whatever data read from the underlying socket channel
> into the session buffer will be force-fed into the consumer regardless
> of its declared capacity for HTTP/1.1 connections.
>
> > I've also tried running the
> > Http1IntegrationTest.testSlowResponseConsumer()
> > test, substituting this line to trigger updateCapacity() calls:
> >
> >
> > client.start(Http1Config.custom().setBufferSize(256).setInitialWindow
> > Size(32).build());
> >
>
> This change alone will have little effect without adjusting the session
> buffer side as well. Given that the default session buffer size is 8192
> with 32 byte initial capacity window one is likely to get multiple
> #consume invocations with negative capacity.
>
>
> > By adding the small initial window, I can see the capacityWindow
> > going more
> > and more negative on each consume() call with all the data buffered
> > before
> > the test code completes its first sleep.
> >
> > I don't have a specific use case for a slow consumer, just want to
> > know if
> > I'm misunderstanding something.
> >
>
> I realized that we likely need to adjust the session buffer size
> automatically when the initial window setting is below than value.
>
> Hope this helps
>
> Oleg
>
> > Thanks!
> > Roy
> >
> > On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> > roy.hashimoto@gmail.com>
> > wrote:
> >
> > > Those are good leads, I'll pursue them.
> > >
> > > Thanks!
> > > Roy
> > >
> > > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <ol...@apache.org>
> > > wrote:
> > >
> > > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > > > > Have you looked at the reactive extensions for HttpCore5? They
> > > > > demonstrate
> > > > > how to implement AsyncEntityProducer/AsyncDataProducer with
> > > > > support
> > > > > for
> > > > > backpressure (or you can just use the Reactive Streams API
> > > > > instead):
> > > > >
> > > > >
> > > >
> > > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > >
> > > > >
> > > >
> > > > Just a bit of background. In 5.0 one can no longer assume that
> > > > one
> > > > message exchange has exclusive ownership of the underlying
> > > > connection.
> > > > Multiplexed message exchanges in HTTP/2 and piplelined message
> > > > exchanges in HTTP/1.1 must not block other concurrent exchanges.
> > > > Message changes however can update their current capacity via
> > > > `CapacityChannel`. Reactive extensions is a great example and
> > > > also an
> > > > alternative to the native APIs per Ryan's recommendation.
> > > >
> > > > If you prefer the native APIs you can take a look at the classic
> > > > I/O
> > > > adaptors that essentially emulate the classic blocking i/o on top
> > > > of
> > > > the new async APIs [1] or HTTP/1.1 integration tests [2] that
> > > > have a
> > > > number of 'slow' consumer / producer test cases.
> > > >
> > > > Cheers
> > > >
> > > > Oleg
> > > >
> > > > [1]
> > > >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > > [2]
> > > >
>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > >
> > > >
> > > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > > roy.hashimoto@gmail.com
> > > > > >
> > > > >
> > > > > wrote:
> > > > >
> > > > > > I'm playing with asynchronous handlers in HttpCore 5, and I'd
> > > > > > like
> > > > > > to have
> > > > > > an AsyncEntityProducer write data at its own (slow) rate like
> > > > > > in
> > > > > > this old
> > > > > > thread <
> > > > > >
> https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > > .
> > > > > >
> > > > > > Writing to the DataStreamChannel whenever I want - outside
> > > > > > the
> > > > > > scope of a
> > > > > > produce() method call - works fine, but I notice that
> > > > > > produce() is
> > > > > > being
> > > > > > called every 5-6 milliseconds which ideally I would like to
> > > > > > eliminate or
> > > > > > reduce.
> > > > > >
> > > > > > The answer in the old thread was to use
> > > > > > IOControl.suspendOutput()
> > > > > > and
> > > > > > IOControl.requestOutput(), but this class appears no longer
> > > > > > to be
> > > > > > in
> > > > > > HttpCore 5. I see that there is a
> > > > > > DataStreamChannel.requestOutput()
> > > > > > but I
> > > > > > haven't figured out what suspension call that should be
> > > > > > paired
> > > > > > with. I have
> > > > > > tried simply returning 0 from my
> > > > > > AsyncEntityProducer.available()
> > > > > > override,
> > > > > > but that doesn't seem to be it.
> > > > > >
> > > > > > Is there a new way to suspend/resume output in HttpCore 5?
> > > > > >
> > > > > > Thanks!
> > > > > > Roy
> > > > > >
> > > > > > Kotlin source here
> > > > > > <
> > > > > >
> https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > > .
> > > >
> > > >
> > > > ---------------------------------------------------------------
> > > > ------
> > > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > > For additional commands, e-mail: dev-help@hc.apache.org
> > > >
> > > >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> For additional commands, e-mail: dev-help@hc.apache.org
>
>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Sun, 2019-09-08 at 17:07 -0700, Roy Hashimoto wrote:
> By looking at the suggested 5.0 examples I was able to get an
> AsyncServerExchangeHandler subclass to play nicely with Kotlin
> coroutines
> on the AsyncDataProducer side of things, i.e. minimizing produce()
> polling
> and avoiding buffering.
> 
> I haven't been as successful with throttling calls on the
> AsyncDataConsumer
> side, i.e. consume() calls keep being made even though the capacity
> window
> has gone negative. I think this might be the expected behavior
> because of
> this comment in AbstractHttp1StreamDuplexer:
> 
>         // At present the consumer can be forced to consume data
>         // over its declared capacity in order to avoid having
>         // unprocessed message body content stuck in the session
>         // input buffer
> 
> Does that refer to just the case where the capacity starts positive
> but
> data exceeding the capacity is delivered to consume()? Or does it
> refer to
> the behavior I see, which is that capacity updates (or the lack of
> them)
> don't seem to have any effect for HTTP/1.1?
> 

It is the former. Whatever data read from the underlying socket channel
into the session buffer will be force-fed into the consumer regardless
of its declared capacity for HTTP/1.1 connections.

> I've also tried running the
> Http1IntegrationTest.testSlowResponseConsumer()
> test, substituting this line to trigger updateCapacity() calls:
> 
> 
> client.start(Http1Config.custom().setBufferSize(256).setInitialWindow
> Size(32).build());
> 

This change alone will have little effect without adjusting the session
buffer side as well. Given that the default session buffer size is 8192
with 32 byte initial capacity window one is likely to get multiple
#consume invocations with negative capacity. 


> By adding the small initial window, I can see the capacityWindow
> going more
> and more negative on each consume() call with all the data buffered
> before
> the test code completes its first sleep.
> 
> I don't have a specific use case for a slow consumer, just want to
> know if
> I'm misunderstanding something.
> 

I realized that we likely need to adjust the session buffer size
automatically when the initial window setting is below than value. 

Hope this helps

Oleg

> Thanks!
> Roy
> 
> On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <
> roy.hashimoto@gmail.com>
> wrote:
> 
> > Those are good leads, I'll pursue them.
> > 
> > Thanks!
> > Roy
> > 
> > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <ol...@apache.org>
> > wrote:
> > 
> > > On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > > > Have you looked at the reactive extensions for HttpCore5? They
> > > > demonstrate
> > > > how to implement AsyncEntityProducer/AsyncDataProducer with
> > > > support
> > > > for
> > > > backpressure (or you can just use the Reactive Streams API
> > > > instead):
> > > > 
> > > > 
> > > 
> > > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> > > > 
> > > > 
> > > 
> > > Just a bit of background. In 5.0 one can no longer assume that
> > > one
> > > message exchange has exclusive ownership of the underlying
> > > connection.
> > > Multiplexed message exchanges in HTTP/2 and piplelined message
> > > exchanges in HTTP/1.1 must not block other concurrent exchanges.
> > > Message changes however can update their current capacity via
> > > `CapacityChannel`. Reactive extensions is a great example and
> > > also an
> > > alternative to the native APIs per Ryan's recommendation.
> > > 
> > > If you prefer the native APIs you can take a look at the classic
> > > I/O
> > > adaptors that essentially emulate the classic blocking i/o on top
> > > of
> > > the new async APIs [1] or HTTP/1.1 integration tests [2] that
> > > have a
> > > number of 'slow' consumer / producer test cases.
> > > 
> > > Cheers
> > > 
> > > Oleg
> > > 
> > > [1]
> > > 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> > > [2]
> > > 
https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> > > 
> > > 
> > > > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <
> > > > roy.hashimoto@gmail.com
> > > > > 
> > > > 
> > > > wrote:
> > > > 
> > > > > I'm playing with asynchronous handlers in HttpCore 5, and I'd
> > > > > like
> > > > > to have
> > > > > an AsyncEntityProducer write data at its own (slow) rate like
> > > > > in
> > > > > this old
> > > > > thread <
> > > > > 
https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > > > .
> > > > > 
> > > > > Writing to the DataStreamChannel whenever I want - outside
> > > > > the
> > > > > scope of a
> > > > > produce() method call - works fine, but I notice that
> > > > > produce() is
> > > > > being
> > > > > called every 5-6 milliseconds which ideally I would like to
> > > > > eliminate or
> > > > > reduce.
> > > > > 
> > > > > The answer in the old thread was to use
> > > > > IOControl.suspendOutput()
> > > > > and
> > > > > IOControl.requestOutput(), but this class appears no longer
> > > > > to be
> > > > > in
> > > > > HttpCore 5. I see that there is a
> > > > > DataStreamChannel.requestOutput()
> > > > > but I
> > > > > haven't figured out what suspension call that should be
> > > > > paired
> > > > > with. I have
> > > > > tried simply returning 0 from my
> > > > > AsyncEntityProducer.available()
> > > > > override,
> > > > > but that doesn't seem to be it.
> > > > > 
> > > > > Is there a new way to suspend/resume output in HttpCore 5?
> > > > > 
> > > > > Thanks!
> > > > > Roy
> > > > > 
> > > > > Kotlin source here
> > > > > <
> > > > > 
https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > > > > .
> > > 
> > > 
> > > ---------------------------------------------------------------
> > > ------
> > > To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> > > For additional commands, e-mail: dev-help@hc.apache.org
> > > 
> > > 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Ryan Schmitt <rs...@pobox.com>.
This question about forced data consumption has come up before:

https://github.com/apache/httpcomponents-core/pull/91#issuecomment-432868169

On Sun, Sep 8, 2019 at 5:08 PM Roy Hashimoto <ro...@gmail.com>
wrote:

> By looking at the suggested 5.0 examples I was able to get an
> AsyncServerExchangeHandler subclass to play nicely with Kotlin coroutines
> on the AsyncDataProducer side of things, i.e. minimizing produce() polling
> and avoiding buffering.
>
> I haven't been as successful with throttling calls on the AsyncDataConsumer
> side, i.e. consume() calls keep being made even though the capacity window
> has gone negative. I think this might be the expected behavior because of
> this comment in AbstractHttp1StreamDuplexer:
>
>         // At present the consumer can be forced to consume data
>         // over its declared capacity in order to avoid having
>         // unprocessed message body content stuck in the session
>         // input buffer
>
> Does that refer to just the case where the capacity starts positive but
> data exceeding the capacity is delivered to consume()? Or does it refer to
> the behavior I see, which is that capacity updates (or the lack of them)
> don't seem to have any effect for HTTP/1.1?
>
> I've also tried running the Http1IntegrationTest.testSlowResponseConsumer()
> test, substituting this line to trigger updateCapacity() calls:
>
>
>
> client.start(Http1Config.custom().setBufferSize(256).setInitialWindowSize(32).build());
>
> By adding the small initial window, I can see the capacityWindow going more
> and more negative on each consume() call with all the data buffered before
> the test code completes its first sleep.
>
> I don't have a specific use case for a slow consumer, just want to know if
> I'm misunderstanding something.
>
> Thanks!
> Roy
>
> On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <ro...@gmail.com>
> wrote:
>
> > Those are good leads, I'll pursue them.
> >
> > Thanks!
> > Roy
> >
> > On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <ol...@apache.org>
> wrote:
> >
> >> On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> >> > Have you looked at the reactive extensions for HttpCore5? They
> >> > demonstrate
> >> > how to implement AsyncEntityProducer/AsyncDataProducer with support
> >> > for
> >> > backpressure (or you can just use the Reactive Streams API instead):
> >> >
> >> >
> >>
> >>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> >> >
> >> >
> >>
> >> Just a bit of background. In 5.0 one can no longer assume that one
> >> message exchange has exclusive ownership of the underlying connection.
> >> Multiplexed message exchanges in HTTP/2 and piplelined message
> >> exchanges in HTTP/1.1 must not block other concurrent exchanges.
> >> Message changes however can update their current capacity via
> >> `CapacityChannel`. Reactive extensions is a great example and also an
> >> alternative to the native APIs per Ryan's recommendation.
> >>
> >> If you prefer the native APIs you can take a look at the classic I/O
> >> adaptors that essentially emulate the classic blocking i/o on top of
> >> the new async APIs [1] or HTTP/1.1 integration tests [2] that have a
> >> number of 'slow' consumer / producer test cases.
> >>
> >> Cheers
> >>
> >> Oleg
> >>
> >> [1]
> >>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> >> [2]
> >>
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
> >>
> >>
> >> > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <roy.hashimoto@gmail.com
> >> > >
> >> > wrote:
> >> >
> >> > > I'm playing with asynchronous handlers in HttpCore 5, and I'd like
> >> > > to have
> >> > > an AsyncEntityProducer write data at its own (slow) rate like in
> >> > > this old
> >> > > thread <
> >> > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> >> > > > .
> >> > >
> >> > > Writing to the DataStreamChannel whenever I want - outside the
> >> > > scope of a
> >> > > produce() method call - works fine, but I notice that produce() is
> >> > > being
> >> > > called every 5-6 milliseconds which ideally I would like to
> >> > > eliminate or
> >> > > reduce.
> >> > >
> >> > > The answer in the old thread was to use IOControl.suspendOutput()
> >> > > and
> >> > > IOControl.requestOutput(), but this class appears no longer to be
> >> > > in
> >> > > HttpCore 5. I see that there is a DataStreamChannel.requestOutput()
> >> > > but I
> >> > > haven't figured out what suspension call that should be paired
> >> > > with. I have
> >> > > tried simply returning 0 from my AsyncEntityProducer.available()
> >> > > override,
> >> > > but that doesn't seem to be it.
> >> > >
> >> > > Is there a new way to suspend/resume output in HttpCore 5?
> >> > >
> >> > > Thanks!
> >> > > Roy
> >> > >
> >> > > Kotlin source here
> >> > > <
> >> > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> >> > > >.
> >> > >
> >>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> >> For additional commands, e-mail: dev-help@hc.apache.org
> >>
> >>
>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Roy Hashimoto <ro...@gmail.com>.
By looking at the suggested 5.0 examples I was able to get an
AsyncServerExchangeHandler subclass to play nicely with Kotlin coroutines
on the AsyncDataProducer side of things, i.e. minimizing produce() polling
and avoiding buffering.

I haven't been as successful with throttling calls on the AsyncDataConsumer
side, i.e. consume() calls keep being made even though the capacity window
has gone negative. I think this might be the expected behavior because of
this comment in AbstractHttp1StreamDuplexer:

        // At present the consumer can be forced to consume data
        // over its declared capacity in order to avoid having
        // unprocessed message body content stuck in the session
        // input buffer

Does that refer to just the case where the capacity starts positive but
data exceeding the capacity is delivered to consume()? Or does it refer to
the behavior I see, which is that capacity updates (or the lack of them)
don't seem to have any effect for HTTP/1.1?

I've also tried running the Http1IntegrationTest.testSlowResponseConsumer()
test, substituting this line to trigger updateCapacity() calls:


client.start(Http1Config.custom().setBufferSize(256).setInitialWindowSize(32).build());

By adding the small initial window, I can see the capacityWindow going more
and more negative on each consume() call with all the data buffered before
the test code completes its first sleep.

I don't have a specific use case for a slow consumer, just want to know if
I'm misunderstanding something.

Thanks!
Roy

On Fri, Sep 6, 2019 at 10:15 AM Roy Hashimoto <ro...@gmail.com>
wrote:

> Those are good leads, I'll pursue them.
>
> Thanks!
> Roy
>
> On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <ol...@apache.org> wrote:
>
>> On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
>> > Have you looked at the reactive extensions for HttpCore5? They
>> > demonstrate
>> > how to implement AsyncEntityProducer/AsyncDataProducer with support
>> > for
>> > backpressure (or you can just use the Reactive Streams API instead):
>> >
>> >
>>
>> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
>> >
>> >
>>
>> Just a bit of background. In 5.0 one can no longer assume that one
>> message exchange has exclusive ownership of the underlying connection.
>> Multiplexed message exchanges in HTTP/2 and piplelined message
>> exchanges in HTTP/1.1 must not block other concurrent exchanges.
>> Message changes however can update their current capacity via
>> `CapacityChannel`. Reactive extensions is a great example and also an
>> alternative to the native APIs per Ryan's recommendation.
>>
>> If you prefer the native APIs you can take a look at the classic I/O
>> adaptors that essentially emulate the classic blocking i/o on top of
>> the new async APIs [1] or HTTP/1.1 integration tests [2] that have a
>> number of 'slow' consumer / producer test cases.
>>
>> Cheers
>>
>> Oleg
>>
>> [1]
>> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
>> [2]
>> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
>>
>>
>> > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <roy.hashimoto@gmail.com
>> > >
>> > wrote:
>> >
>> > > I'm playing with asynchronous handlers in HttpCore 5, and I'd like
>> > > to have
>> > > an AsyncEntityProducer write data at its own (slow) rate like in
>> > > this old
>> > > thread <
>> > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
>> > > > .
>> > >
>> > > Writing to the DataStreamChannel whenever I want - outside the
>> > > scope of a
>> > > produce() method call - works fine, but I notice that produce() is
>> > > being
>> > > called every 5-6 milliseconds which ideally I would like to
>> > > eliminate or
>> > > reduce.
>> > >
>> > > The answer in the old thread was to use IOControl.suspendOutput()
>> > > and
>> > > IOControl.requestOutput(), but this class appears no longer to be
>> > > in
>> > > HttpCore 5. I see that there is a DataStreamChannel.requestOutput()
>> > > but I
>> > > haven't figured out what suspension call that should be paired
>> > > with. I have
>> > > tried simply returning 0 from my AsyncEntityProducer.available()
>> > > override,
>> > > but that doesn't seem to be it.
>> > >
>> > > Is there a new way to suspend/resume output in HttpCore 5?
>> > >
>> > > Thanks!
>> > > Roy
>> > >
>> > > Kotlin source here
>> > > <
>> > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
>> > > >.
>> > >
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
>> For additional commands, e-mail: dev-help@hc.apache.org
>>
>>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Roy Hashimoto <ro...@gmail.com>.
Those are good leads, I'll pursue them.

Thanks!
Roy

On Fri, Sep 6, 2019 at 9:57 AM Oleg Kalnichevski <ol...@apache.org> wrote:

> On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> > Have you looked at the reactive extensions for HttpCore5? They
> > demonstrate
> > how to implement AsyncEntityProducer/AsyncDataProducer with support
> > for
> > backpressure (or you can just use the Reactive Streams API instead):
> >
> >
>
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> >
> >
>
> Just a bit of background. In 5.0 one can no longer assume that one
> message exchange has exclusive ownership of the underlying connection.
> Multiplexed message exchanges in HTTP/2 and piplelined message
> exchanges in HTTP/1.1 must not block other concurrent exchanges.
> Message changes however can update their current capacity via
> `CapacityChannel`. Reactive extensions is a great example and also an
> alternative to the native APIs per Ryan's recommendation.
>
> If you prefer the native APIs you can take a look at the classic I/O
> adaptors that essentially emulate the classic blocking i/o on top of
> the new async APIs [1] or HTTP/1.1 integration tests [2] that have a
> number of 'slow' consumer / producer test cases.
>
> Cheers
>
> Oleg
>
> [1]
> https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
> [2]
> https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
>
>
> > On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <roy.hashimoto@gmail.com
> > >
> > wrote:
> >
> > > I'm playing with asynchronous handlers in HttpCore 5, and I'd like
> > > to have
> > > an AsyncEntityProducer write data at its own (slow) rate like in
> > > this old
> > > thread <
> > > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > > .
> > >
> > > Writing to the DataStreamChannel whenever I want - outside the
> > > scope of a
> > > produce() method call - works fine, but I notice that produce() is
> > > being
> > > called every 5-6 milliseconds which ideally I would like to
> > > eliminate or
> > > reduce.
> > >
> > > The answer in the old thread was to use IOControl.suspendOutput()
> > > and
> > > IOControl.requestOutput(), but this class appears no longer to be
> > > in
> > > HttpCore 5. I see that there is a DataStreamChannel.requestOutput()
> > > but I
> > > haven't figured out what suspension call that should be paired
> > > with. I have
> > > tried simply returning 0 from my AsyncEntityProducer.available()
> > > override,
> > > but that doesn't seem to be it.
> > >
> > > Is there a new way to suspend/resume output in HttpCore 5?
> > >
> > > Thanks!
> > > Roy
> > >
> > > Kotlin source here
> > > <
> > > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > > >.
> > >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
> For additional commands, e-mail: dev-help@hc.apache.org
>
>

Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Oleg Kalnichevski <ol...@apache.org>.
On Fri, 2019-09-06 at 09:43 -0700, Ryan Schmitt wrote:
> Have you looked at the reactive extensions for HttpCore5? They
> demonstrate
> how to implement AsyncEntityProducer/AsyncDataProducer with support
> for
> backpressure (or you can just use the Reactive Streams API instead):
> 
> 
https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive
> 
> 

Just a bit of background. In 5.0 one can no longer assume that one
message exchange has exclusive ownership of the underlying connection.
Multiplexed message exchanges in HTTP/2 and piplelined message
exchanges in HTTP/1.1 must not block other concurrent exchanges.
Message changes however can update their current capacity via
`CapacityChannel`. Reactive extensions is a great example and also an
alternative to the native APIs per Ryan's recommendation.

If you prefer the native APIs you can take a look at the classic I/O
adaptors that essentially emulate the classic blocking i/o on top of
the new async APIs [1] or HTTP/1.1 integration tests [2] that have a
number of 'slow' consumer / producer test cases.

Cheers

Oleg     

[1] https://github.com/apache/httpcomponents-core/tree/master/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic
[2] https://github.com/apache/httpcomponents-core/blob/master/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java


> On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <roy.hashimoto@gmail.com
> >
> wrote:
> 
> > I'm playing with asynchronous handlers in HttpCore 5, and I'd like
> > to have
> > an AsyncEntityProducer write data at its own (slow) rate like in
> > this old
> > thread <
> > https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> > > .
> > 
> > Writing to the DataStreamChannel whenever I want - outside the
> > scope of a
> > produce() method call - works fine, but I notice that produce() is
> > being
> > called every 5-6 milliseconds which ideally I would like to
> > eliminate or
> > reduce.
> > 
> > The answer in the old thread was to use IOControl.suspendOutput()
> > and
> > IOControl.requestOutput(), but this class appears no longer to be
> > in
> > HttpCore 5. I see that there is a DataStreamChannel.requestOutput()
> > but I
> > haven't figured out what suspension call that should be paired
> > with. I have
> > tried simply returning 0 from my AsyncEntityProducer.available()
> > override,
> > but that doesn't seem to be it.
> > 
> > Is there a new way to suspend/resume output in HttpCore 5?
> > 
> > Thanks!
> > Roy
> > 
> > Kotlin source here
> > <
> > https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250
> > >.
> > 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org


Re: throttling calls to AsyncEntityProducer in HttpCore 5

Posted by Ryan Schmitt <rs...@apache.org>.
Have you looked at the reactive extensions for HttpCore5? They demonstrate
how to implement AsyncEntityProducer/AsyncDataProducer with support for
backpressure (or you can just use the Reactive Streams API instead):

https://github.com/apache/httpcomponents-core/tree/master/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive


On Fri, Sep 6, 2019 at 9:33 AM Roy Hashimoto <ro...@gmail.com>
wrote:

> I'm playing with asynchronous handlers in HttpCore 5, and I'd like to have
> an AsyncEntityProducer write data at its own (slow) rate like in this old
> thread <https://marc.info/?l=httpclient-commons-dev&m=134928851229305&w=2
> >.
> Writing to the DataStreamChannel whenever I want - outside the scope of a
> produce() method call - works fine, but I notice that produce() is being
> called every 5-6 milliseconds which ideally I would like to eliminate or
> reduce.
>
> The answer in the old thread was to use IOControl.suspendOutput() and
> IOControl.requestOutput(), but this class appears no longer to be in
> HttpCore 5. I see that there is a DataStreamChannel.requestOutput() but I
> haven't figured out what suspension call that should be paired with. I have
> tried simply returning 0 from my AsyncEntityProducer.available() override,
> but that doesn't seem to be it.
>
> Is there a new way to suspend/resume output in HttpCore 5?
>
> Thanks!
> Roy
>
> Kotlin source here
> <https://gist.github.com/rhashimoto/1f5501d3b5d2aa95251fe12f4f0be250>.
>