You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/07/29 19:09:02 UTC

Any data gets cached?

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster of 2 nodes.Every run produces a different result while we know what the result MUST be: its an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting the exact same previous result.But its different. Run it again. Another different wrong result. A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate your kind feedback on this.Cheers+have a great weekend.Amir-

Re: Any data gets cached?

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Raghu...appreciate response...can you point me to a doc/pub where I can see how "aggregating using Beam primitives" is actually implemented pls?Have a great weekend.Amir-

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
   
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster of 2 nodes.Every run produces a different result while we know what the result MUST be: its an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting the exact same previous result.But its different. Run it again. Another different wrong result. A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate your kind feedback on this.Cheers+have a great weekend.Amir-



   

Re: Any data gets cached?

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Raghu...we are on the same page...but am not sure how it applies to my own code :(
Pipeline p = Pipeline.create(options);
............................etc...........................try { PCollection<KV<String, String>> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic) .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {...................etc......................
                                            @Override public void processElement(ProcessContext ctx) throws Exception {............................etc......................................
I do a DoFn on an anonymous class and there, each record gets executed one at a time. Supposedly.But the right number of outcome always exceeds the expected value...inconsistent and varies after each run.I have my own doubts about how to guarantee the "aggregation" consistency in pipelines "parallelism" ...I receive n number of records, that is proven fact, but mt transformation executes more records.Sorry for the long email...just dont know how to verify the pipeline consistency...Cheers      From: Raghu Angadi <ra...@google.com>
 To: amir bahmanyari <am...@yahoo.com> 
Cc: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
 Sent: Tuesday, August 2, 2016 10:31 PM
 Subject: Re: Any data gets cached?
   

On Tue, Aug 2, 2016 at 4:37 PM, amir bahmanyari <am...@yahoo.com> wrote:

     .apply(new CountWords()) 

// counts same word occurence and return it as a long
                                                      
That is an aggregation. 
// Format the long result created by CountWords  object to text     .apply(MapElements.via(new FormatAsTextFn()))

This is not. 
// TextIO to write result as text into an output-file set in options





  

Re: Any data gets cached?

Posted by Raghu Angadi <ra...@google.com>.
On Tue, Aug 2, 2016 at 4:37 PM, amir bahmanyari <am...@yahoo.com> wrote:

>      .apply(new CountWords())
>
// counts same word occurence and return it as a long
>

That is an aggregation.


> // Format the long result created by CountWords  object to text
>      .apply(MapElements.via(new FormatAsTextFn()))
>

This is not.


> // TextIO to write result as text into an output-file set in options
>

Re: Any data gets cached?

Posted by amir bahmanyari <am...@yahoo.com>.
:)))Ok I give it a try.
// TextIO to read a line from input-file set in options p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))//Instantiate CountWords & and pass the line that was just read in the previous line to it for process     .apply(new CountWords())// counts same word occurence and return it as a long// Format the long result created by CountWords  object to text     .apply(MapElements.via(new FormatAsTextFn()))// TextIO to write result as text into an output-file set in options
     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
Did I pass? :))Thanks       From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Tuesday, August 2, 2016 3:59 PM
 Subject: Re: Any data gets cached?
   
Hi Amir,
It is not clear what you want do. I am not even sure if we both mean the same when we say 'aggregation' :). Can you confirm if you understand how Beam WordCount.java example works? Especially the lines 208-211.

On Tue, Aug 2, 2016 at 11:25 AM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Any opinion on this pls? I appreciate your time...Cheers

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Sunday, July 31, 2016 3:05 PM
 Subject: Re: Any data gets cached?
   
Hi Raghu,Thanks so much for your response. Following is how I am reading unbounded records from Kafka through KafkaIO() & processing them in its corresponding inner class.How do I include "aggregation" in this call?have a great weekend.
Pipeline p = Pipeline.create(options);
............................etc...........................try { PCollection<KV<String, String>> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic) .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {...................etc......................
                                            @Override public void processElement(ProcessContext ctx) throws Exception {............................etc......................................       From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Saturday, July 30, 2016 2:15 PM
 Subject: Re: Any data gets cached?
  
On the surface it looks like you are asking about basic aggregations. These are of course provided by Beam too. Almost all Beam examples make use of these. See 'Count.<string>PerElement()' in WordCound.java example. If not either post your Beam code or roughly equivalent SQL here.
On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Is this the right assumption that if results are not aggregated we may have inconsistency in what the final result may look like?What would be the best aggregation approach to guarantee consistency? Even if there is perf. cost.Thanks

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
  
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster of 2 nodes.Every run produces a different result while we know what the result MUST be: its an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting the exact same previous result.But its different. Run it again. Another different wrong result. A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate your kind feedback on this.Cheers+have a great weekend.Amir-



   



   

   



  

Re: Any data gets cached?

Posted by Raghu Angadi <ra...@google.com>.
Hi Amir,

It is not clear what you want do. I am not even sure if we both mean the
same when we say 'aggregation' :).
Can you confirm if you understand how Beam WordCount.java
<https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L208>
example works? Especially the lines 208-211.


On Tue, Aug 2, 2016 at 11:25 AM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Raghu,
> Any opinion on this pls? I appreciate your time...
> Cheers
>
>
> ------------------------------
> *From:* amir bahmanyari <am...@yahoo.com>
> *To:* "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> *Sent:* Sunday, July 31, 2016 3:05 PM
>
> *Subject:* Re: Any data gets cached?
>
> Hi Raghu,
> Thanks so much for your response. Following is how I am reading unbounded
> records from Kafka through KafkaIO() & processing them in its corresponding
> inner class.
> How do I include "aggregation" in this call?
> have a great weekend.
>
> Pipeline p = Pipeline.create(options);
> ............................etc...........................
> try {
> PCollection<KV<String, String>> kafkarecords =
> p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
> .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> .apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> ...................etc......................
>
>                                             @Override
> public void processElement(ProcessContext ctx) throws Exception {
> ............................etc......................................
> ------------------------------
> *From:* Raghu Angadi <ra...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Saturday, July 30, 2016 2:15 PM
> *Subject:* Re: Any data gets cached?
>
> On the surface it looks like you are asking about basic aggregations.
> These are of course provided by Beam too. Almost all Beam examples make use
> of these. See 'Count.<string>PerElement()
> <https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L152>'
> in WordCound.java example. If not either post your Beam code or roughly
> equivalent SQL here.
>
> On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Raghu,
> Is this the right assumption that if results are not aggregated we may
> have inconsistency in what the final result may look like?
> What would be the best aggregation approach to guarantee consistency? Even
> if there is perf. cost.
> Thanks
>
> ------------------------------
> *From:* Raghu Angadi <ra...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Friday, July 29, 2016 2:32 PM
> *Subject:* Re: Any data gets cached?
>
> I suggest you try aggregating using Beam primitives (GroupByKey, count
> etc), see if it produced in consistent results.
>
> On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Sorry colleagues,
> I am having a moving target in my Beam app with FlinkRunner in a Flink
> Cluster of 2 nodes.
> Every run produces a different result while we know what the result MUST
> be: its an expected fixed number.
> I checked and see Kafka is NOT sending any extra records.
> My first suspect was Redis thread-UNsafe hashmap objects.
> I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very
> first time every PERFECT.
> I then re-run exact same thing expecting the exact same previous result.
> But its different. Run it again. Another different wrong result. A
> different result each time.
> No code change nothing different.
>
> I am wondering if some previous data/record gets cached by
> Beam/FlinkRunner/KafkaIO invocation etc. somewhere.
> Sorry for the long email. Am losing my mind catching this moving target :))
> Appreciate your kind feedback on this.
> Cheers+have a great weekend.
> Amir-
>
>
>
>
>
>
>
>
>
>

Re: Any data gets cached?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,Any opinion on this pls? I appreciate your time...Cheers

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Sunday, July 31, 2016 3:05 PM
 Subject: Re: Any data gets cached?
   
Hi Raghu,Thanks so much for your response. Following is how I am reading unbounded records from Kafka through KafkaIO() & processing them in its corresponding inner class.How do I include "aggregation" in this call?have a great weekend.
Pipeline p = Pipeline.create(options);
............................etc...........................try { PCollection<KV<String, String>> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic) .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {...................etc......................
                                            @Override public void processElement(ProcessContext ctx) throws Exception {............................etc......................................       From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Saturday, July 30, 2016 2:15 PM
 Subject: Re: Any data gets cached?
  
On the surface it looks like you are asking about basic aggregations. These are of course provided by Beam too. Almost all Beam examples make use of these. See 'Count.<string>PerElement()' in WordCound.java example. If not either post your Beam code or roughly equivalent SQL here.
On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Is this the right assumption that if results are not aggregated we may have inconsistency in what the final result may look like?What would be the best aggregation approach to guarantee consistency? Even if there is perf. cost.Thanks

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
  
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster of 2 nodes.Every run produces a different result while we know what the result MUST be: its an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting the exact same previous result.But its different. Run it again. Another different wrong result. A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate your kind feedback on this.Cheers+have a great weekend.Amir-



   



   

  

Re: Any data gets cached?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,Thanks so much for your response. Following is how I am reading unbounded records from Kafka through KafkaIO() & processing them in its corresponding inner class.How do I include "aggregation" in this call?have a great weekend.
Pipeline p = Pipeline.create(options);
............................etc...........................try { PCollection<KV<String, String>> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic) .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, String>>() {...................etc......................
                                            @Override public void processElement(ProcessContext ctx) throws Exception {............................etc......................................       From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Saturday, July 30, 2016 2:15 PM
 Subject: Re: Any data gets cached?
   
On the surface it looks like you are asking about basic aggregations. These are of course provided by Beam too. Almost all Beam examples make use of these. See 'Count.<string>PerElement()' in WordCound.java example. If not either post your Beam code or roughly equivalent SQL here.
On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Is this the right assumption that if results are not aggregated we may have inconsistency in what the final result may look like?What would be the best aggregation approach to guarantee consistency? Even if there is perf. cost.Thanks

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
  
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster of 2 nodes.Every run produces a different result while we know what the result MUST be: its an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting the exact same previous result.But its different. Run it again. Another different wrong result. A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate your kind feedback on this.Cheers+have a great weekend.Amir-



   



  

Re: Any data gets cached?

Posted by Raghu Angadi <ra...@google.com>.
On the surface it looks like you are asking about basic aggregations. These
are of course provided by Beam too. Almost all Beam examples make use of
these. See 'Count.<string>PerElement()
<https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L152>'
in WordCound.java example. If not either post your Beam code or roughly
equivalent SQL here.

On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Raghu,
> Is this the right assumption that if results are not aggregated we may
> have inconsistency in what the final result may look like?
> What would be the best aggregation approach to guarantee consistency? Even
> if there is perf. cost.
> Thanks
>
> ------------------------------
> *From:* Raghu Angadi <ra...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Friday, July 29, 2016 2:32 PM
> *Subject:* Re: Any data gets cached?
>
> I suggest you try aggregating using Beam primitives (GroupByKey, count
> etc), see if it produced in consistent results.
>
> On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Sorry colleagues,
> I am having a moving target in my Beam app with FlinkRunner in a Flink
> Cluster of 2 nodes.
> Every run produces a different result while we know what the result MUST
> be: its an expected fixed number.
> I checked and see Kafka is NOT sending any extra records.
> My first suspect was Redis thread-UNsafe hashmap objects.
> I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very
> first time every PERFECT.
> I then re-run exact same thing expecting the exact same previous result.
> But its different. Run it again. Another different wrong result. A
> different result each time.
> No code change nothing different.
>
> I am wondering if some previous data/record gets cached by
> Beam/FlinkRunner/KafkaIO invocation etc. somewhere.
> Sorry for the long email. Am losing my mind catching this moving target :))
> Appreciate your kind feedback on this.
> Cheers+have a great weekend.
> Amir-
>
>
>
>
>

Re: Any data gets cached?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,Is this the right assumption that if results are not aggregated we may have inconsistency in what the final result may look like?What would be the best aggregation approach to guarantee consistency? Even if there is perf. cost.Thanks

      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
   
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), see if it produced in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster of 2 nodes.Every run produces a different result while we know what the result MUST be: its an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting the exact same previous result.But its different. Run it again. Another different wrong result. A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate your kind feedback on this.Cheers+have a great weekend.Amir-



   

Re: Any data gets cached?

Posted by Raghu Angadi <ra...@google.com>.
I suggest you try aggregating using Beam primitives (GroupByKey, count
etc), see if it produced in consistent results.

On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Sorry colleagues,
> I am having a moving target in my Beam app with FlinkRunner in a Flink
> Cluster of 2 nodes.
> Every run produces a different result while we know what the result MUST
> be: its an expected fixed number.
> I checked and see Kafka is NOT sending any extra records.
> My first suspect was Redis thread-UNsafe hashmap objects.
> I replaced them with Java ConcurrentHashMaps. Wow! it worked for the very
> first time every PERFECT.
> I then re-run exact same thing expecting the exact same previous result.
> But its different. Run it again. Another different wrong result. A
> different result each time.
> No code change nothing different.
>
> I am wondering if some previous data/record gets cached by
> Beam/FlinkRunner/KafkaIO invocation etc. somewhere.
> Sorry for the long email. Am losing my mind catching this moving target :))
> Appreciate your kind feedback on this.
> Cheers+have a great weekend.
> Amir-
>