You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Luke Cwik <lc...@google.com> on 2020/08/03 22:28:15 UTC

Re: Request Throttling in OSSIO

Since you are working on a new connector I would very strongly
suggest writing it as a splittable DoFn instead of an UnboundedSource. See
this thread[1] about additional details and some caveats on the
recommendation.

1) You can return false from advance and the runner will execute advance at
some point in time instead of sleeping. This is also the correct thing to
do if you hit a throttling error. With a splittable DoFn you can return a
process continuation allowing you to suggest an amount of time to wait
before being resumed.

2) It looks like null was returned as the checkpoint mark coder[2].

1:
https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
2:
https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223

On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

> Hello Dev team,
>
> We are giving our first shot in writing Beam IO connector for Oracle
> Streaming Service (OSS). The plan is to first implement it for enterprise
> use and based on the feedback and stability make it available open source.
> This is our first attempt in developing a Beam IO connector and so far we
> have progressed with the help of Beam documentation and other related IOs
> like KafkaIO, KinesisIO. Thanks to the community on that front.
>
> Now OSS *has a read limit of 200ms* so when we read the data as shown
> below in our UnboundedReaders *advance()* method
>
> // Get Messages
>
> GetMessagesResponse getResponse =
> this.streamClient.getMessages(getRequest);
>
> We are able to read around five message but after that we are getting *request
> throttling error*
>
> Request was throttled because requests limit exhausted, next request can
> be made in 200 ms
>
> We tried with an initial solution of introducing *Thread.sleep(200)*
> before the getMessages to see how it is behaving and this time we are *able
> to read around 300+ messages*. With the expertise available in this
> forum, I would like to hear inputs on two points.
>
>    1.
>
>    How to implement this feature in a proper way rather than just with a
>    one-line Thread.sleep(200)
>    2.
>
>    After adding Thread.sleep(200) and reading 300+ messages the pipeline
>    encountered below error. I do not see any implementation specific detail in
>    the stack trace. Can I get an insight what this error could be and how to
>    handle.
>
>    java.lang.NullPointerException
>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (CoderUtils.java:82)
>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:66)
>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:51)
>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader (UnboundedReadEvaluatorFactory.java:224)
>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement (UnboundedReadEvaluatorFactory.java:132)
>        at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:160)
>        at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:124)
>        at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
>        at java.lang.Thread.run (Thread.java:748)
>
>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Re: Request Throttling in OSSIO

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Thanks Luke. I will go through them and come back if I have any questions.

Regards,
Praveen

On Tue, Aug 4, 2020 at 3:55 PM Luke Cwik <lc...@google.com> wrote:

> Take a look at the WatchGrowthFn[1] and also the in-progress Kafka PR[2].
>
> 1:
> https://github.com/apache/beam/blob/6612b24ada9382706373db547b5606d6e0496b02/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787
> 2: https://github.com/apache/beam/pull/11749
>
> On Tue, Aug 4, 2020 at 3:33 PM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Thanks for the suggestions Luke. As you know, we are just starting and
>> should be able to switch to SplittableDoFn, if that's the future of Beam IO
>> Connectors. The SplittableDoFn page has the design details but it would be
>> great if we can look into an IO connector built using SplittableDoFn
>> for reference and to map the design details with actual implementation.
>> Could you please suggest any such IO for reference.
>>
>> I will also parallely try your suggestion in advance() and checkpoint
>> mark coder to close that issue.
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> Since you are working on a new connector I would very strongly
>>> suggest writing it as a splittable DoFn instead of an UnboundedSource. See
>>> this thread[1] about additional details and some caveats on the
>>> recommendation.
>>>
>>> 1) You can return false from advance and the runner will execute advance
>>> at some point in time instead of sleeping. This is also the correct thing
>>> to do if you hit a throttling error. With a splittable DoFn you can return
>>> a process continuation allowing you to suggest an amount of time to wait
>>> before being resumed.
>>>
>>> 2) It looks like null was returned as the checkpoint mark coder[2].
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
>>> 2:
>>> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223
>>>
>>> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
>>> harish.praveen@gmail.com> wrote:
>>>
>>>> Hello Dev team,
>>>>
>>>> We are giving our first shot in writing Beam IO connector for Oracle
>>>> Streaming Service (OSS). The plan is to first implement it for enterprise
>>>> use and based on the feedback and stability make it available open source.
>>>> This is our first attempt in developing a Beam IO connector and so far we
>>>> have progressed with the help of Beam documentation and other related IOs
>>>> like KafkaIO, KinesisIO. Thanks to the community on that front.
>>>>
>>>> Now OSS *has a read limit of 200ms* so when we read the data as shown
>>>> below in our UnboundedReaders *advance()* method
>>>>
>>>> // Get Messages
>>>>
>>>> GetMessagesResponse getResponse =
>>>> this.streamClient.getMessages(getRequest);
>>>>
>>>> We are able to read around five message but after that we are getting *request
>>>> throttling error*
>>>>
>>>> Request was throttled because requests limit exhausted, next request
>>>> can be made in 200 ms
>>>>
>>>> We tried with an initial solution of introducing *Thread.sleep(200)*
>>>> before the getMessages to see how it is behaving and this time we are *able
>>>> to read around 300+ messages*. With the expertise available in this
>>>> forum, I would like to hear inputs on two points.
>>>>
>>>>    1.
>>>>
>>>>    How to implement this feature in a proper way rather than just with
>>>>    a one-line Thread.sleep(200)
>>>>    2.
>>>>
>>>>    After adding Thread.sleep(200) and reading 300+ messages the
>>>>    pipeline encountered below error. I do not see any implementation specific
>>>>    detail in the stack trace. Can I get an insight what this error could be
>>>>    and how to handle.
>>>>
>>>>    java.lang.NullPointerException
>>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (CoderUtils.java:82)
>>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:66)
>>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:51)
>>>>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>>>>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader (UnboundedReadEvaluatorFactory.java:224)
>>>>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement (UnboundedReadEvaluatorFactory.java:132)
>>>>        at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:160)
>>>>        at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:124)
>>>>        at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
>>>>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>>>>        at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
>>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
>>>>        at java.lang.Thread.run (Thread.java:748)
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Praveen K Viswanathan
>>>>
>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: Request Throttling in OSSIO

Posted by Luke Cwik <lc...@google.com>.
Take a look at the WatchGrowthFn[1] and also the in-progress Kafka PR[2].

1:
https://github.com/apache/beam/blob/6612b24ada9382706373db547b5606d6e0496b02/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787
2: https://github.com/apache/beam/pull/11749

On Tue, Aug 4, 2020 at 3:33 PM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

> Thanks for the suggestions Luke. As you know, we are just starting and
> should be able to switch to SplittableDoFn, if that's the future of Beam IO
> Connectors. The SplittableDoFn page has the design details but it would be
> great if we can look into an IO connector built using SplittableDoFn
> for reference and to map the design details with actual implementation.
> Could you please suggest any such IO for reference.
>
> I will also parallely try your suggestion in advance() and checkpoint mark
> coder to close that issue.
>
> Thanks,
> Praveen
>
> On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <lc...@google.com> wrote:
>
>> Since you are working on a new connector I would very strongly
>> suggest writing it as a splittable DoFn instead of an UnboundedSource. See
>> this thread[1] about additional details and some caveats on the
>> recommendation.
>>
>> 1) You can return false from advance and the runner will execute advance
>> at some point in time instead of sleeping. This is also the correct thing
>> to do if you hit a throttling error. With a splittable DoFn you can return
>> a process continuation allowing you to suggest an amount of time to wait
>> before being resumed.
>>
>> 2) It looks like null was returned as the checkpoint mark coder[2].
>>
>> 1:
>> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
>> 2:
>> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223
>>
>> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
>> harish.praveen@gmail.com> wrote:
>>
>>> Hello Dev team,
>>>
>>> We are giving our first shot in writing Beam IO connector for Oracle
>>> Streaming Service (OSS). The plan is to first implement it for enterprise
>>> use and based on the feedback and stability make it available open source.
>>> This is our first attempt in developing a Beam IO connector and so far we
>>> have progressed with the help of Beam documentation and other related IOs
>>> like KafkaIO, KinesisIO. Thanks to the community on that front.
>>>
>>> Now OSS *has a read limit of 200ms* so when we read the data as shown
>>> below in our UnboundedReaders *advance()* method
>>>
>>> // Get Messages
>>>
>>> GetMessagesResponse getResponse =
>>> this.streamClient.getMessages(getRequest);
>>>
>>> We are able to read around five message but after that we are getting *request
>>> throttling error*
>>>
>>> Request was throttled because requests limit exhausted, next request can
>>> be made in 200 ms
>>>
>>> We tried with an initial solution of introducing *Thread.sleep(200)*
>>> before the getMessages to see how it is behaving and this time we are *able
>>> to read around 300+ messages*. With the expertise available in this
>>> forum, I would like to hear inputs on two points.
>>>
>>>    1.
>>>
>>>    How to implement this feature in a proper way rather than just with
>>>    a one-line Thread.sleep(200)
>>>    2.
>>>
>>>    After adding Thread.sleep(200) and reading 300+ messages the
>>>    pipeline encountered below error. I do not see any implementation specific
>>>    detail in the stack trace. Can I get an insight what this error could be
>>>    and how to handle.
>>>
>>>    java.lang.NullPointerException
>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (CoderUtils.java:82)
>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:66)
>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:51)
>>>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>>>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader (UnboundedReadEvaluatorFactory.java:224)
>>>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement (UnboundedReadEvaluatorFactory.java:132)
>>>        at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:160)
>>>        at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:124)
>>>        at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
>>>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>>>        at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
>>>        at java.lang.Thread.run (Thread.java:748)
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Re: Request Throttling in OSSIO

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Thanks for the suggestions Luke. As you know, we are just starting and
should be able to switch to SplittableDoFn, if that's the future of Beam IO
Connectors. The SplittableDoFn page has the design details but it would be
great if we can look into an IO connector built using SplittableDoFn
for reference and to map the design details with actual implementation.
Could you please suggest any such IO for reference.

I will also parallely try your suggestion in advance() and checkpoint mark
coder to close that issue.

Thanks,
Praveen

On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <lc...@google.com> wrote:

> Since you are working on a new connector I would very strongly
> suggest writing it as a splittable DoFn instead of an UnboundedSource. See
> this thread[1] about additional details and some caveats on the
> recommendation.
>
> 1) You can return false from advance and the runner will execute advance
> at some point in time instead of sleeping. This is also the correct thing
> to do if you hit a throttling error. With a splittable DoFn you can return
> a process continuation allowing you to suggest an amount of time to wait
> before being resumed.
>
> 2) It looks like null was returned as the checkpoint mark coder[2].
>
> 1:
> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
> 2:
> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223
>
> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Hello Dev team,
>>
>> We are giving our first shot in writing Beam IO connector for Oracle
>> Streaming Service (OSS). The plan is to first implement it for enterprise
>> use and based on the feedback and stability make it available open source.
>> This is our first attempt in developing a Beam IO connector and so far we
>> have progressed with the help of Beam documentation and other related IOs
>> like KafkaIO, KinesisIO. Thanks to the community on that front.
>>
>> Now OSS *has a read limit of 200ms* so when we read the data as shown
>> below in our UnboundedReaders *advance()* method
>>
>> // Get Messages
>>
>> GetMessagesResponse getResponse =
>> this.streamClient.getMessages(getRequest);
>>
>> We are able to read around five message but after that we are getting *request
>> throttling error*
>>
>> Request was throttled because requests limit exhausted, next request can
>> be made in 200 ms
>>
>> We tried with an initial solution of introducing *Thread.sleep(200)*
>> before the getMessages to see how it is behaving and this time we are *able
>> to read around 300+ messages*. With the expertise available in this
>> forum, I would like to hear inputs on two points.
>>
>>    1.
>>
>>    How to implement this feature in a proper way rather than just with a
>>    one-line Thread.sleep(200)
>>    2.
>>
>>    After adding Thread.sleep(200) and reading 300+ messages the pipeline
>>    encountered below error. I do not see any implementation specific detail in
>>    the stack trace. Can I get an insight what this error could be and how to
>>    handle.
>>
>>    java.lang.NullPointerException
>>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (CoderUtils.java:82)
>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:66)
>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:51)
>>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader (UnboundedReadEvaluatorFactory.java:224)
>>        at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement (UnboundedReadEvaluatorFactory.java:132)
>>        at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:160)
>>        at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:124)
>>        at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
>>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>>        at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
>>        at java.lang.Thread.run (Thread.java:748)
>>
>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan