You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Josh <jo...@gmail.com> on 2017/06/02 11:06:00 UTC

Re: How to decrease latency when using PubsubIO.Read?

Hi guys,

I just wanted to give a quick update on what happened with this -
after looking into it some more, it appears the extra latency was entirely
caused by the publisher of the messages rather than the Beam job/PubsubIO.
After tweaking the publishers to use smaller batches and adhere to a
'maximum latency', the end-to-end latency (from publishing through to
Bigtable) now seems to be well under 1s, which is what I was hoping for!

Thanks again for the advice on this :)

Josh

On Wed, May 24, 2017 at 9:31 PM, <jo...@gmail.com> wrote:

> Hi Raghu,
>
> Thanks for the suggestions and for having a look at the metrics!
> I will try the reshuffle and tweaking the publisher batching over the next
> couple of days, along with Ankur's suggestions, and will report back if any
> of these have a significant difference.
>
> Best regards,
> Josh
>
> On 24 May 2017, at 21:17, Raghu Angadi <ra...@google.com> wrote:
>
> Josh,
>
> Reshuffle might also be worth trying. To clarify, ~1s end-to-end is not
> always simple given number of systems and services involved between
> publisher and eventual sink.
>
> Raghu.
>
> On Wed, May 24, 2017 at 12:32 PM, Raghu Angadi <ra...@google.com> wrote:
>
>> Thanks. Looked a some job system level metrics. I see minimal latency
>> within Dataflow pipeline itself, you might not see much improvement from
>> Reshuffle() (may be ~0.25 seconds).
>>
>> Do you have control on publisher? Publishers might be batching hundreds
>> of messages, which adds latency. You can try to reduce that. Even then
>> PubSub itself does some batching internally which might limit overall
>> latency. Removing publisher batching is worth a try.
>>
>>
>> On Wed, May 24, 2017 at 11:46 AM, Josh <jo...@gmail.com> wrote:
>>
>>> Hi Raghu,
>>>
>>> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for
>>> taking a look!
>>>
>>> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
>>> latency. It seems to take 3-6 seconds typically, I would like to get it
>>> down to ~1s.
>>>
>>> Thanks,
>>> Josh
>>>
>>> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> Josh,
>>>>
>>>> Can you share your job_id? I could take look. Are you measuring latency
>>>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
>>>> for sink?
>>>>
>>>> There is no easy way to use more workers when auto-scaling is enabled.
>>>> It thinks your backlog and CPU are low enough and does not need to scale.
>>>> Raghu.
>>>>
>>>> On Wed, May 24, 2017 at 10:14 AM, Josh <jo...@gmail.com> wrote:
>>>>
>>>>> Thanks Ankur, that's super helpful! I will give these optimisations a
>>>>> go.
>>>>>
>>>>> About the "No operations completed" message - there are a few of these
>>>>> in the logs (but very few, like 1 an hour or something) - so probably no
>>>>> need to scale up Bigtable.
>>>>> I did however see a lot of INFO messages "Wrote 0 records" in the
>>>>> logs. Probably about 50% of the "Wrote n records" messages are zero.
>>>>> While the other 50% are quite high (e.g. "Wrote 80 records"). Not sure if
>>>>> that could indicate a bad setting?
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com>
>>>>> wrote:
>>>>>
>>>>>> There are two main things to see here:
>>>>>>
>>>>>> * In the logs, are there any messages like "No operations completed
>>>>>> within the last 61 seconds. There are still 1 simple operations and 1
>>>>>> complex operations in progress.” This means you are underscaled on the
>>>>>> bigtable side and would benefit from  increasing the node count.
>>>>>> * We also saw some improvement in performance (workload dependent) by
>>>>>> going to a bigger worker machine type.
>>>>>> * Another optimization that worked for our use case:
>>>>>>
>>>>>> // streaming dataflow has larger machines with smaller bundles, so we can queue up a lot more without blowing up
>>>>>> private static BigtableOptions createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>>>>>     return new BigtableOptions.Builder()
>>>>>>             .setProjectId(opts.getProject())
>>>>>>             .setInstanceId(opts.getBigtableInstanceId())
>>>>>>             .setUseCachedDataPool(true)
>>>>>>             .setDataChannelCount(32)
>>>>>>             .setBulkOptions(new BulkOptions.Builder()
>>>>>>                     .setUseBulkApi(true)
>>>>>>                     .setBulkMaxRowKeyCount(2048)
>>>>>>                     .setBulkMaxRequestSize(8_388_608L)
>>>>>>                     .setAsyncMutatorWorkerCount(32)
>>>>>>                     .build())
>>>>>>             .build();
>>>>>> }
>>>>>>
>>>>>>
>>>>>> There is a lot of trial and error involved in getting the end-to-end
>>>>>> latency down so I would suggest enabling the profiling using the
>>>>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>>>>>
>>>>>> — Ankur Chauhan
>>>>>>
>>>>>> On May 24, 2017, at 9:09 AM, Josh <jo...@gmail.com> wrote:
>>>>>>
>>>>>> Ah ok - I am using the Dataflow runner. I didn't realise about the
>>>>>> custom implementation being provided at runtime...
>>>>>>
>>>>>> Any ideas of how to tweak my job to either lower the latency
>>>>>> consuming from PubSub or to lower the latency in writing to Bigtable?
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow,
>>>>>>> Apex, ...)?
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>>>>>>> runner are you using? If you are using google cloud dataflow then the
>>>>>>>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>>>>>>>> provide a custom implementation at run time.
>>>>>>>>
>>>>>>>> Ankur Chauhan
>>>>>>>> Sent from my iPhone
>>>>>>>>
>>>>>>>> On May 24, 2017, at 07:52, Josh <jo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi Ankur,
>>>>>>>>
>>>>>>>> What do you mean by runner address?
>>>>>>>> Would you be able to link me to the comment you're referring to?
>>>>>>>>
>>>>>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>>>>> o/gcp/pubsub/PubsubIO.java
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Josh
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> What runner address you using. Google cloud dataflow uses a closed
>>>>>>>>> source version of the pubsub reader as noted in a comment on Read class.
>>>>>>>>>
>>>>>>>>> Ankur Chauhan
>>>>>>>>> Sent from my iPhone
>>>>>>>>>
>>>>>>>>> On May 24, 2017, at 04:05, Josh <jo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job
>>>>>>>>> then writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>>>>>>>> seconds between the messages being published and being written to Bigtable.
>>>>>>>>>
>>>>>>>>> I want to try and decrease the latency to <1s if possible - does
>>>>>>>>> anyone have any tips for doing this?
>>>>>>>>>
>>>>>>>>> I noticed that there is a PubsubGrpcClient
>>>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>>>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the
>>>>>>>>> PubsubUnboundedSource is initialised with a PubsubJsonClient, so the Grpc
>>>>>>>>> client doesn't appear to be being used. Is there a way to switch to the
>>>>>>>>> Grpc client - as perhaps that would give better performance?
>>>>>>>>>
>>>>>>>>> Also, I am running my job on Dataflow using autoscaling, which has
>>>>>>>>> only allocated one n1-standard-4 instance to the job, which is
>>>>>>>>> running at ~50% CPU. Could forcing a higher number of nodes help improve
>>>>>>>>> latency?
>>>>>>>>>
>>>>>>>>> Thanks for any advice,
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>