You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/07/13 19:09:43 UTC

Random behavior with my Beam FlinkRunner streaming app

Hi Colleagues,I am getting random results for:- exact same data input- exact same app binary- exact same Flink cluster instancesEverything fixed, just repeat of running the something.Every-time, I get a different result while data doesn't change, code doesn't change, logic to calculate results is exact same...
Is Beam "parallelism" playing a role due to something "un-usual" in my code?What could the "un-usual" be in the app that may make the Beam  pipleline produces different results for exact same "everything"?Than+regards,Amir-

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,AM cleaning up my code.Will share some of it when its clean.Thanks so much.

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Wednesday, July 13, 2016 3:52 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   

On Wed, Jul 13, 2016 at 2:14 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,- I receive exact same number of records from Kafka each run.

How do you do that? (curious + I just just want to make sure) 
- I then invoke a method  say MyMethod thats internal to the DoFn inner class. - There is logic there in MyMethod to extract certain fields from the record & increment a counter based on satisfying a condition on those fields values.

there are multiple questions I could ask here. But this is vague enough that there could be lots of issues. If you are just incrementing a counter and aggregating it correctly, there is no reason to expect different results.
Lots of people here would be interested in taking a look if you can reproduce it outside your set up. But most likely root cause would be either the input records or how you are aggregating the final count. 
That counter varies as per each run.Thanks Raghu.

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Wednesday, July 13, 2016 1:52 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
More details would be useful. You can include pseudo code for your job will be useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in your reader? I don't see that you are configuring Kafka reader for that in the code below.
On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <am...@yahoo.com> wrote:

try{ PCollection<KV<String, String>> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {.....etc.




   



  

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by Raghu Angadi <ra...@google.com>.
On Wed, Jul 13, 2016 at 2:14 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Raghu,
> - I receive exact same number of records from Kafka each run.
>

How do you do that? (curious + I just just want to make sure)


> - I then invoke a method  say MyMethod thats internal to the DoFn inner
> class.
> - There is logic there in MyMethod to extract certain fields from the
> record & increment a counter based on satisfying a condition on those
> fields values.
>

there are multiple questions I could ask here. But this is vague enough
that there could be lots of issues. If you are just incrementing a counter
and aggregating it correctly, there is no reason to expect different
results.

Lots of people here would be interested in taking a look if you can
reproduce it outside your set up. But most likely root cause would be
either the input records or how you are aggregating the final count.


> That counter varies as per each run.
> Thanks Raghu.
>
> ------------------------------
> *From:* Raghu Angadi <ra...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Wednesday, July 13, 2016 1:52 PM
> *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app
>
> More details would be useful. You can include pseudo code for your job
> will be useful (or actual code with any specific sections removed).
>
> How are you making sure you read exactly same same of records from Kafka
> in your reader? I don't see that you are configuring Kafka reader for that
> in the code below.
>
> On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> .....etc.
>
>
>
>
>
>

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,- I receive exact same number of records from Kafka each run.- I then invoke a method  say MyMethod thats internal to the DoFn inner class. - There is logic there in MyMethod to extract certain fields from the record & increment a counter based on satisfying a condition on those fields values.
That counter varies as per each run.Thanks Raghu.

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Wednesday, July 13, 2016 1:52 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
More details would be useful. You can include pseudo code for your job will be useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in your reader? I don't see that you are configuring Kafka reader for that in the code below.
On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <am...@yahoo.com> wrote:

try{ PCollection<KV<String, String>> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {.....etc.




  

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by Raghu Angadi <ra...@google.com>.
More details would be useful. You can include pseudo code for your job will
be useful (or actual code with any specific sections removed).

How are you making sure you read exactly same same of records from Kafka in
your reader? I don't see that you are configuring Kafka reader for that in
the code below.

On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> .....etc.
>

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by Lukasz Cwik <lc...@google.com>.
A bundle is a logical grouping of records which is controlled by a runner
to break down really large datasets into smaller manageable pieces.

Collect all the records that you see during processElement into a list
inside your DoFn and then output a single log line in finishBundle with all
the elements that you saw. Clear the list after outputting the log line.
Then you'll see the bundles of work that your DoFns are processing and can
then logically see whats occurring.

On Wed, Jul 13, 2016 at 3:47 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Thanks Lukasz.
> "logging the records you see for each bundle " makes me wonder why you
> are referring to "bundle"?
> Sorry my confusion.
> My assumption is that I receive "one record" at a time from Kafka, and I
> am executing "one record" at a time in DoFn class object.
> Is there something in the way I am invoking KafkaIO that translates to a
> "bundle" rather than a "single record" at a time?
> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
>
> Perhaps I am overlapping / repeating...somehow...
> Thanks again Lukasz.
>
>
> ------------------------------
> *From:* Lukasz Cwik <lc...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Wednesday, July 13, 2016 12:38 PM
>
> *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app
>
> Try using smaller datasets and logging the records you see for each bundle
> in your DoFns. This will help you see how your data is transitioning
> through the pipeline.
>
> On Wed, Jul 13, 2016 at 3:30 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Thanks Lukasz,
> I receive records via Kafka to my Beam app KafkaIO.read():
> And this is how:
> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> .....etc.
>
> Pls let me know if I should provide further deeper details....I appreciate
> your attention.
> Am sure there are lessons for me to learn from "this" :-)
> Cheers+thanks so much.
>
>
>
> ------------------------------
> *From:* Lukasz Cwik <lc...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Wednesday, July 13, 2016 12:22 PM
> *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app
>
> Are your DoFn's idempotent and don't rely on ordering of elements?
> Do you use any triggers?
>
> Lots of things that can non-determinism to your output, need more details
> about what your pipeline does.
> Using smaller input datasets can help you track down the source of
> non-determinism.
>
>
> On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I am getting random results for:
> - exact same data input
> - exact same app binary
> - exact same Flink cluster instances
> Everything fixed, just repeat of running the something.
> Every-time, I get a different result while data doesn't change, code
> doesn't change, logic to calculate results is exact same...
>
> Is Beam "parallelism" playing a role due to something "un-usual" in my
> code?
> What could the "un-usual" be in the app that may make the Beam  pipleline
> produces different results for exact same "everything"?
> Than+regards,
> Amir-
>
>
>
>
>
>
>
>

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Lukasz."logging the records you see for each bundle " makes me wonder why you are referring to "bundle"?Sorry my confusion. 
My assumption is that I receive "one record" at a time from Kafka, and I am executing "one record" at a time in DoFn class object.Is there something in the way I am invoking KafkaIO that translates to a "bundle" rather than a "single record" at a time?try{ PCollection<KV<String, String>> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {
Perhaps I am overlapping / repeating...somehow...Thanks again Lukasz.

      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Wednesday, July 13, 2016 12:38 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
Try using smaller datasets and logging the records you see for each bundle in your DoFns. This will help you see how your data is transitioning through the pipeline.
On Wed, Jul 13, 2016 at 3:30 PM, amir bahmanyari <am...@yahoo.com> wrote:

Thanks Lukasz,I receive records via Kafka to my Beam app KafkaIO.read():And this is how:try{ PCollection<KV<String, String>> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {.....etc.
Pls let me know if I should provide further deeper details....I appreciate your attention. Am sure there are lessons for me to learn from "this" :-)Cheers+thanks so much.


      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Wednesday, July 13, 2016 12:22 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
  
Are your DoFn's idempotent and don't rely on ordering of elements?Do you use any triggers?
Lots of things that can non-determinism to your output, need more details about what your pipeline does.Using smaller input datasets can help you track down the source of non-determinism.


On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I am getting random results for:- exact same data input- exact same app binary- exact same Flink cluster instancesEverything fixed, just repeat of running the something.Every-time, I get a different result while data doesn't change, code doesn't change, logic to calculate results is exact same...
Is Beam "parallelism" playing a role due to something "un-usual" in my code?What could the "un-usual" be in the app that may make the Beam  pipleline produces different results for exact same "everything"?Than+regards,Amir-



   



  

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by Lukasz Cwik <lc...@google.com>.
Try using smaller datasets and logging the records you see for each bundle
in your DoFns. This will help you see how your data is transitioning
through the pipeline.

On Wed, Jul 13, 2016 at 3:30 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Thanks Lukasz,
> I receive records via Kafka to my Beam app KafkaIO.read():
> And this is how:
> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> .....etc.
>
> Pls let me know if I should provide further deeper details....I appreciate
> your attention.
> Am sure there are lessons for me to learn from "this" :-)
> Cheers+thanks so much.
>
>
>
> ------------------------------
> *From:* Lukasz Cwik <lc...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Wednesday, July 13, 2016 12:22 PM
> *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app
>
> Are your DoFn's idempotent and don't rely on ordering of elements?
> Do you use any triggers?
>
> Lots of things that can non-determinism to your output, need more details
> about what your pipeline does.
> Using smaller input datasets can help you track down the source of
> non-determinism.
>
>
> On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I am getting random results for:
> - exact same data input
> - exact same app binary
> - exact same Flink cluster instances
> Everything fixed, just repeat of running the something.
> Every-time, I get a different result while data doesn't change, code
> doesn't change, logic to calculate results is exact same...
>
> Is Beam "parallelism" playing a role due to something "un-usual" in my
> code?
> What could the "un-usual" be in the app that may make the Beam  pipleline
> produces different results for exact same "everything"?
> Than+regards,
> Amir-
>
>
>
>
>

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Lukasz,I receive records via Kafka to my Beam app KafkaIO.read():And this is how:try{ PCollection<KV<String, String>> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {.....etc.
Pls let me know if I should provide further deeper details....I appreciate your attention. Am sure there are lessons for me to learn from "this" :-)Cheers+thanks so much.


      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Wednesday, July 13, 2016 12:22 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
Are your DoFn's idempotent and don't rely on ordering of elements?Do you use any triggers?
Lots of things that can non-determinism to your output, need more details about what your pipeline does.Using smaller input datasets can help you track down the source of non-determinism.


On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I am getting random results for:- exact same data input- exact same app binary- exact same Flink cluster instancesEverything fixed, just repeat of running the something.Every-time, I get a different result while data doesn't change, code doesn't change, logic to calculate results is exact same...
Is Beam "parallelism" playing a role due to something "un-usual" in my code?What could the "un-usual" be in the app that may make the Beam  pipleline produces different results for exact same "everything"?Than+regards,Amir-



  

Re: Random behavior with my Beam FlinkRunner streaming app

Posted by Lukasz Cwik <lc...@google.com>.
Are your DoFn's idempotent and don't rely on ordering of elements?
Do you use any triggers?

Lots of things that can non-determinism to your output, need more details
about what your pipeline does.
Using smaller input datasets can help you track down the source of
non-determinism.


On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Colleagues,
> I am getting random results for:
> - exact same data input
> - exact same app binary
> - exact same Flink cluster instances
> Everything fixed, just repeat of running the something.
> Every-time, I get a different result while data doesn't change, code
> doesn't change, logic to calculate results is exact same...
>
> Is Beam "parallelism" playing a role due to something "un-usual" in my
> code?
> What could the "un-usual" be in the app that may make the Beam  pipleline
> produces different results for exact same "everything"?
> Than+regards,
> Amir-
>