You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/08/08 06:44:46 UTC

Is Beam pipeline runtime behavior inconsistent?

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner(FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics). withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create()).apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))           .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Kenneth Knowles <kl...@google.com>.
Hi Amir,

I suggest these changes:

 - Add a dependency on `beam-sdks-java-core`. This is the main artifact for
use in Java.
 - Remove the dependency on `beam-runners-core-java`. This is an artifact
for runner authors. You shouldn't need it.
 - Add a runtime-scoped dependency on `beam-runners-direct-java` for local
testing.
 - Add a runtime-scoped dependency on another runner. From your other
emails, it seems you might want `beam-runners-flink_2.10`.

Side note: Beam 0.2.0-incubating is released.

Kenn

On Tue, Aug 9, 2016 at 10:42 AM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Kenn,
> Following is an extract from my pom. It works for me the way it is.
> But, I appreciate any improvements suggestions.
> Thanks+have a great day.
> Cheers
>
>     <dependencies>
> <dependency>
>    <groupId>org.apache.beam</groupId>
>    <artifactId>beam-runners-core-java</artifactId>
>    <version>0.1.0-incubating</version>
> </dependency>
>
>
> <dependency>
>   <groupId>biz.paluch.redis</groupId>
>   <artifactId>lettuce</artifactId>
>   <version>3.4.3.Final</version>
> </dependency>
>
> <dependency>
>       <groupId>org.apache.beam</groupId>
>       <artifactId>flink-runner_2.10</artifactId>
>      <version>0.1.0-incubating-SNAPSHOT</version>
>     </dependency>
>
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>     <version>0.1.0-incubating</version>
> </dependency>
>     </dependencies>
>
>     <build>
>         <plugins>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-shade-plugin</artifactId>
>                 <version>2.4.1</version>
>                 <executions>
>                     <execution>
>                         <phase>package</phase>
>                         <goals>
>                             <goal>shade</goal>
>                         </goals>
>                         <configuration>
>                             <transformers>
>                                 <transformer implementation="org.apache.
> maven.plugins.shade.resource.ManifestResourceTransformer">
>                                     <mainClass>benchmark.flinkspark.flink.
> BenchBeamRunners</mainClass>
>                                 </transformer>
>                             </transformers>
>                             <artifactSet>
>                                 <excludes>
>                                     <exclude>org.apache.flink:*</exclude>
>                                 </excludes>
>                             </artifactSet>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
> <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-compiler-plugin</artifactId>
>                 <version>3.1</version>
>                 <configuration>
>                     <source>1.8</source>
>                     <target>1.8</target>
>                 </configuration>
>             </plugin>
>         </plugins>
>
>     </build>
> </project>
>
>
>
> ------------------------------
> *From:* Kenneth Knowles <kl...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 7:29 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Hi Amir,
>
> How are you assembling your classpath? The library that contains that
> class should be automatically included.
>
> Kenn
>
> On Mon, Aug 8, 2016 at 5:30 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Yes :) I figured it out and it compiled.
> now ClassNotFound ar runtime: org.apache.beam.runners.core.
> GroupAlsoByWindowViaWindowSetD oFn
> Researching the right runners package...
> Thanks 1000000000000 Thomas...hope I will see reasonable results at least
> just to convince my own team why we get different results at different runs
> for a GEEAD REASON..
> Cheers
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 5:26 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))
>
> On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> I used the following & get a compilation error:
> "The method apply(PTransform<? super PCollection<Iterable<String>>,
> OutputT>) in the type PCollection<Iterable<String>> is not applicable for
> the arguments (new DoFn<Iterable<String>,String>( ){})"
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>           .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>     .discardingFiredPanes());
>
> kafkarecords.apply(WithKeys.< Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable< String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {......
>
>
> I compared with some GroupByKey examples ...nothing that matched it.
> Should I simplify the KafkaIO() call to avoid this compilation error?
> Thanks for your help.
> Amir-
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 3:50 PM
>
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> You would performance no better than single-threaded behavior if you group
> everything into a single key, hence why this approach is strongly not
> recommended. You can still get continuous output, depending on the
> triggering, but you lose all of scaling benefits of running a pipeline as
> opposed to a simple Java program, plus may incur some additional overhead.
>
> To enforce this sort of threading you would do something among the lines
> of:
>
> kafkarecords.apply(WithKeys.< Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable< String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {...});
>
> Where the DoFn unrolls its input and on each element applies the
> processing.
>
>
> On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Thanks so much Thomas.
> Fantastic answer & great learning about whats really going on underneath
> the hood.
> Have a question on your suggestion: "To do so, you would key the inputs
> to a single static key and apply a GroupByKey, running the processing
> method on the output Iterable produced by the GroupByKey"...
> Wouldn't doing such defeats the "real-time Streaming" objectives?
> To me the above leads to a simulation of a simple single threaded java
> process but its executing in a massively parallel infrastructure in
>  a"fancy" way :-)
> Is there an example that demonstrates how to actually implement your
> suggestion above without any hidden loopholes pls?
> I can at least try it and see how far it gets for R&D purposes & share the
> results with the community.
> Cheers+have a wonderful day.
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Kenn,Following is an extract from my pom. It works for me the way it is.But, I appreciate any improvements suggestions.Thanks+have a great day.Cheers
    <dependencies><dependency>   <groupId>org.apache.beam</groupId>   <artifactId>beam-runners-core-java</artifactId>   <version>0.1.0-incubating</version></dependency>

<dependency>  <groupId>biz.paluch.redis</groupId>  <artifactId>lettuce</artifactId>  <version>3.4.3.Final</version></dependency> 
<dependency>      <groupId>org.apache.beam</groupId>      <artifactId>flink-runner_2.10</artifactId>     <version>0.1.0-incubating-SNAPSHOT</version>           </dependency> 
<dependency>    <groupId>org.apache.beam</groupId>    <artifactId>beam-sdks-java-io-kafka</artifactId>    <version>0.1.0-incubating</version></dependency>     </dependencies>
    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-shade-plugin</artifactId>                <version>2.4.1</version>                <executions>                    <execution>                        <phase>package</phase>                        <goals>                            <goal>shade</goal>                        </goals>                        <configuration>                            <transformers>                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                    <mainClass>benchmark.flinkspark.flink.BenchBeamRunners</mainClass>                                </transformer>                            </transformers>                            <artifactSet>                                <excludes>                                    <exclude>org.apache.flink:*</exclude>                                </excludes>                            </artifactSet>                        </configuration>                    </execution>                </executions>            </plugin><plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.1</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>
    </build></project>


      From: Kenneth Knowles <kl...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 7:29 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
Hi Amir,
How are you assembling your classpath? The library that contains that class should be automatically included.
Kenn
On Mon, Aug 8, 2016 at 5:30 PM, amir bahmanyari <am...@yahoo.com> wrote:

Yes :) I figured it out and it compiled.now ClassNotFound ar runtime: org.apache.beam.runners.core. GroupAlsoByWindowViaWindowSetD oFn
Researching the right runners package...Thanks 1000000000000 Thomas...hope I will see reasonable results at least just to convince my own team why we get different results at different runs for a GEEAD REASON..Cheers

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 5:26 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))
On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,I used the following & get a compilation error:"The method apply(PTransform<? super PCollection<Iterable<String>>, OutputT>) in the type PCollection<Iterable<String>> is not applicable for the arguments (new DoFn<Iterable<String>,String>( ){})"
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());
kafkarecords.apply(WithKeys.< Integer, String>of(1))    .apply(GroupByKey.<Integer, String>create())    .apply(Values.<Iterable< String>>create())    .apply(new DoFn<Iterable<String>, String>() {......

I compared with some GroupByKey examples ...nothing that matched it.Should I simplify the KafkaIO() call to avoid this compilation error?Thanks for your help.Amir-      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 3:50 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
You would performance no better than single-threaded behavior if you group everything into a single key, hence why this approach is strongly not recommended. You can still get continuous output, depending on the triggering, but you lose all of scaling benefits of running a pipeline as opposed to a simple Java program, plus may incur some additional overhead.
To enforce this sort of threading you would do something among the lines of:
kafkarecords.apply(WithKeys.< Integer, String>of(1))    .apply(GroupByKey.<Integer, String>create())    .apply(Values.<Iterable< String>>create())    .apply(new DoFn<Iterable<String>, String>() {...});
Where the DoFn unrolls its input and on each element applies the processing.

On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com> wrote:

Thanks so much Thomas. Fantastic answer & great learning about whats really going on underneath the hood.Have a question on your suggestion: "To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey"...Wouldn't doing such defeats the "real-time Streaming" objectives?To me the above leads to a simulation of a simple single threaded java process but its executing in a massively parallel infrastructure in  a"fancy" way :-)Is there an example that demonstrates how to actually implement your suggestion above without any hidden loopholes pls? I can at least try it and see how far it gets for R&D purposes & share the results with the community.Cheers+have a wonderful day.
      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



   



   



  

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Kenneth Knowles <kl...@google.com>.
Hi Amir,

How are you assembling your classpath? The library that contains that class
should be automatically included.

Kenn

On Mon, Aug 8, 2016 at 5:30 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Yes :) I figured it out and it compiled.
> now ClassNotFound ar runtime: org.apache.beam.runners.core.
> GroupAlsoByWindowViaWindowSetDoFn
> Researching the right runners package...
> Thanks 1000000000000 Thomas...hope I will see reasonable results at least
> just to convince my own team why we get different results at different runs
> for a GEEAD REASON..
> Cheers
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 5:26 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))
>
> On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> I used the following & get a compilation error:
> "The method apply(PTransform<? super PCollection<Iterable<String>>,
> OutputT>) in the type PCollection<Iterable<String>> is not applicable for
> the arguments (new DoFn<Iterable<String>,String>( ){})"
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>           .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>     .discardingFiredPanes());
>
> kafkarecords.apply(WithKeys.< Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable< String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {......
>
>
> I compared with some GroupByKey examples ...nothing that matched it.
> Should I simplify the KafkaIO() call to avoid this compilation error?
> Thanks for your help.
> Amir-
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 3:50 PM
>
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> You would performance no better than single-threaded behavior if you group
> everything into a single key, hence why this approach is strongly not
> recommended. You can still get continuous output, depending on the
> triggering, but you lose all of scaling benefits of running a pipeline as
> opposed to a simple Java program, plus may incur some additional overhead.
>
> To enforce this sort of threading you would do something among the lines
> of:
>
> kafkarecords.apply(WithKeys.< Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable< String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {...});
>
> Where the DoFn unrolls its input and on each element applies the
> processing.
>
>
> On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Thanks so much Thomas.
> Fantastic answer & great learning about whats really going on underneath
> the hood.
> Have a question on your suggestion: "To do so, you would key the inputs
> to a single static key and apply a GroupByKey, running the processing
> method on the output Iterable produced by the GroupByKey"...
> Wouldn't doing such defeats the "real-time Streaming" objectives?
> To me the above leads to a simulation of a simple single threaded java
> process but its executing in a massively parallel infrastructure in
>  a"fancy" way :-)
> Is there an example that demonstrates how to actually implement your
> suggestion above without any hidden loopholes pls?
> I can at least try it and see how far it gets for R&D purposes & share the
> results with the community.
> Cheers+have a wonderful day.
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Yes :) I figured it out and it compiled.now ClassNotFound ar runtime: org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn
Researching the right runners package...Thanks 1000000000000 Thomas...hope I will see reasonable results at least just to convince my own team why we get different results at different runs for a GEEAD REASON..Cheers

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 5:26 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))
On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,I used the following & get a compilation error:"The method apply(PTransform<? super PCollection<Iterable<String>>, OutputT>) in the type PCollection<Iterable<String>> is not applicable for the arguments (new DoFn<Iterable<String>,String>( ){})"
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());
kafkarecords.apply(WithKeys.< Integer, String>of(1))    .apply(GroupByKey.<Integer, String>create())    .apply(Values.<Iterable< String>>create())    .apply(new DoFn<Iterable<String>, String>() {......

I compared with some GroupByKey examples ...nothing that matched it.Should I simplify the KafkaIO() call to avoid this compilation error?Thanks for your help.Amir-      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 3:50 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
You would performance no better than single-threaded behavior if you group everything into a single key, hence why this approach is strongly not recommended. You can still get continuous output, depending on the triggering, but you lose all of scaling benefits of running a pipeline as opposed to a simple Java program, plus may incur some additional overhead.
To enforce this sort of threading you would do something among the lines of:
kafkarecords.apply(WithKeys.< Integer, String>of(1))    .apply(GroupByKey.<Integer, String>create())    .apply(Values.<Iterable< String>>create())    .apply(new DoFn<Iterable<String>, String>() {...});
Where the DoFn unrolls its input and on each element applies the processing.

On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com> wrote:

Thanks so much Thomas. Fantastic answer & great learning about whats really going on underneath the hood.Have a question on your suggestion: "To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey"...Wouldn't doing such defeats the "real-time Streaming" objectives?To me the above leads to a simulation of a simple single threaded java process but its executing in a massively parallel infrastructure in  a"fancy" way :-)Is there an example that demonstrates how to actually implement your suggestion above without any hidden loopholes pls? I can at least try it and see how far it gets for R&D purposes & share the results with the community.Cheers+have a wonderful day.
      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



   



  

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Thomas Groh <tg...@google.com>.
Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))

On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Hi Thomas,
> I used the following & get a compilation error:
> "The method apply(PTransform<? super PCollection<Iterable<String>>,OutputT>)
> in the type PCollection<Iterable<String>> is not applicable for the
> arguments (new DoFn<Iterable<String>,String>(){})"
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>           .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>     .discardingFiredPanes());
>
> kafkarecords.apply(WithKeys.<Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable<String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {......
>
>
> I compared with some GroupByKey examples ...nothing that matched it.
> Should I simplify the KafkaIO() call to avoid this compilation error?
> Thanks for your help.
> Amir-
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 3:50 PM
>
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> You would performance no better than single-threaded behavior if you group
> everything into a single key, hence why this approach is strongly not
> recommended. You can still get continuous output, depending on the
> triggering, but you lose all of scaling benefits of running a pipeline as
> opposed to a simple Java program, plus may incur some additional overhead.
>
> To enforce this sort of threading you would do something among the lines
> of:
>
> kafkarecords.apply(WithKeys.<Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable<String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {...});
>
> Where the DoFn unrolls its input and on each element applies the
> processing.
>
>
> On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Thanks so much Thomas.
> Fantastic answer & great learning about whats really going on underneath
> the hood.
> Have a question on your suggestion: "To do so, you would key the inputs
> to a single static key and apply a GroupByKey, running the processing
> method on the output Iterable produced by the GroupByKey"...
> Wouldn't doing such defeats the "real-time Streaming" objectives?
> To me the above leads to a simulation of a simple single threaded java
> process but its executing in a massively parallel infrastructure in
>  a"fancy" way :-)
> Is there an example that demonstrates how to actually implement your
> suggestion above without any hidden loopholes pls?
> I can at least try it and see how far it gets for R&D purposes & share the
> results with the community.
> Cheers+have a wonderful day.
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Thomas,I used the following & get a compilation error:"The method apply(PTransform<? super PCollection<Iterable<String>>,OutputT>) in the type PCollection<Iterable<String>> is not applicable for the arguments (new DoFn<Iterable<String>,String>(){})"
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());
kafkarecords.apply(WithKeys.<Integer, String>of(1))    .apply(GroupByKey.<Integer, String>create())    .apply(Values.<Iterable<String>>create())    .apply(new DoFn<Iterable<String>, String>() {......

I compared with some GroupByKey examples ...nothing that matched it.Should I simplify the KafkaIO() call to avoid this compilation error?Thanks for your help.Amir-      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 3:50 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
You would performance no better than single-threaded behavior if you group everything into a single key, hence why this approach is strongly not recommended. You can still get continuous output, depending on the triggering, but you lose all of scaling benefits of running a pipeline as opposed to a simple Java program, plus may incur some additional overhead.
To enforce this sort of threading you would do something among the lines of:
kafkarecords.apply(WithKeys.<Integer, String>of(1))    .apply(GroupByKey.<Integer, String>create())    .apply(Values.<Iterable<String>>create())    .apply(new DoFn<Iterable<String>, String>() {...});
Where the DoFn unrolls its input and on each element applies the processing.

On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com> wrote:

Thanks so much Thomas. Fantastic answer & great learning about whats really going on underneath the hood.Have a question on your suggestion: "To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey"...Wouldn't doing such defeats the "real-time Streaming" objectives?To me the above leads to a simulation of a simple single threaded java process but its executing in a massively parallel infrastructure in  a"fancy" way :-)Is there an example that demonstrates how to actually implement your suggestion above without any hidden loopholes pls? I can at least try it and see how far it gets for R&D purposes & share the results with the community.Cheers+have a wonderful day.
      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



  

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Thomas Groh <tg...@google.com>.
You would performance no better than single-threaded behavior if you group
everything into a single key, hence why this approach is strongly not
recommended. You can still get continuous output, depending on the
triggering, but you lose all of scaling benefits of running a pipeline as
opposed to a simple Java program, plus may incur some additional overhead.

To enforce this sort of threading you would do something among the lines of:

kafkarecords.apply(WithKeys.<Integer, String>of(1))
    .apply(GroupByKey.<Integer, String>create())
    .apply(Values.<Iterable<String>>create())
    .apply(new DoFn<Iterable<String>, String>() {...});

Where the DoFn unrolls its input and on each element applies the processing.


On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Thanks so much Thomas.
> Fantastic answer & great learning about whats really going on underneath
> the hood.
> Have a question on your suggestion: "To do so, you would key the inputs
> to a single static key and apply a GroupByKey, running the processing
> method on the output Iterable produced by the GroupByKey"...
> Wouldn't doing such defeats the "real-time Streaming" objectives?
> To me the above leads to a simulation of a simple single threaded java
> process but its executing in a massively parallel infrastructure in
>  a"fancy" way :-)
> Is there an example that demonstrates how to actually implement your
> suggestion above without any hidden loopholes pls?
> I can at least try it and see how far it gets for R&D purposes & share the
> results with the community.
> Cheers+have a wonderful day.
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks so much Thomas. Fantastic answer & great learning about whats really going on underneath the hood.Have a question on your suggestion: "To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey"...Wouldn't doing such defeats the "real-time Streaming" objectives?To me the above leads to a simulation of a simple single threaded java process but its executing in a massively parallel infrastructure in  a"fancy" way :-)Is there an example that demonstrates how to actually implement your suggestion above without any hidden loopholes pls? I can at least try it and see how far it gets for R&D purposes & share the results with the community.Cheers+have a wonderful day.
      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



  

Re: ************* Is Beam pipeline runtime behavior inconsistent?

Posted by Dan Halperin <dh...@google.com>.
Hi Amir,

There seems to be a bit of a rathole here that is hard to talk through over
email.

Are you willing to share a runnable test? Ideally you would send a tarball
or post a runnable pipeline on GitHub along with clear instructions to
reproduce. Assume we can set up our own local Kafka node.

Dan

On Tue, Aug 9, 2016 at 9:24 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Hi Thomas,
> I removed the KafkaIO() call, and replaced it with TextIO() reading data
> records from file system.
> *Works perfect* :-( Not sure to be happy or sad...all this time I proved
> kafka itself was not sending duplicate records.
> But it seems like KafkaIO() has the brain of its own.
>
> Bottom-line: The difference is KafkaIO().....Its probably intermittently
> sending duplicates which I could not catch during my testing.
>
> Anyone can suggest a way to prevent KafkaIO() from re-sending to
> processElement() pls?
> Thanks.
>
> p.apply(TextIO.Read.from("/tmp/10m1x1K.dat"))
> .apply("PseudoLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Tuesday, August 9, 2016 1:22 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> ConcurrentHashMaps can be interacted with in a way that does not preserve
> the intended semantics. If you are using exclusively atomic mutation
> operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can
> ensure that the mutation semantics are obtained; however, using a
> ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors.
> Otherwise, ConcurrentMaps provide happens-before and visibility guarantees
> only.
>
> For the second question, this is mainly about interacting with mutable
> per-element state - if you interact with, for example, mutable instance
> fields that have a base and a current state, the base state must be reset
> per-element. It doesn't sound like this is your problem.
>
> On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> I spent time to digest all of this. I think I understand it to a good
> extent.
> The only hang up I still have is controlling the execution trajectory with
> persisting state which you say its not guaranteed in Beam.
> Have some further questions* Q* below & appreciate your valuable time to
> respond to them. I reiterated your statements in " " for quick reference
> above them.
>
> "We do not encourage sharing objects between DoFn instances, and any
> shared state must be accessed in a thread-safe manner, and modifications to
> shared state should be idempotent, as otherwise retries and speculative
> execution may cause that state to be inconsistent."
> *Q*: I persisted state in (single instance) Redis. I got varying result
> at each run.
> I then replaced Redis with java (static) ConcurrentHashMaps which are
> automatically thread safe. Interesting enough, the very first run after
> this change produced precise result & I thought I GOT IT! Re-run, and I got
> varying results again till this moment I am typing this email. How would
> you suggest to "any shared state must be accessed in a thread-safe manner"
> different than using Concurrent HashMaps?
>
>
> "A DoFn will be reused for multiple elements across a single bundle, and
> may be reused across multiple bundles - if you require the DoFn to be
> "fresh" per element, it should perform any required setup at the start of
> the ProcessElement method."
> *Q*: What do you suggest to "it should perform any required setup at the
> start of the ProcessElement method."?
> I can think of persisting the DoFn Obj's HashCode at the Object class
> level (every-time ProcessElement is invoked)  & compare it later on for
> uniqueness with Object's equals(Obj). It gets a little hairy when
> "parallelism" manifests in execution I know.
> I appreciate your suggestions.
>
>
> Thanks+have a great day.
> Amir
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>

Re: ************* Is Beam pipeline runtime behavior inconsistent?

Posted by Raghu Angadi <ra...@google.com>.
There could very well be a bug in KafkaIO, but what you have described so
far does not necessarily show that. Please file jira with details about
reproducing the problem.

couple of things you can try :

   - run your pipeline with 10 files for TextIO.
   - With KafkaIO, remove your DoFn, and instead run something like this:
      -
      pipeline.apply(KafkaIO.read()...withMaxReadTime(10.minutes).withoutMetadata())
              .apply(Count<String>.globally())
              .apply(ParDo.of(new DoFn<Long, Void>() {
                 @override void processElement(Context ctx) {
                   LOG.info("Read {} records from Kafka", ctx.element());
                 }
               })


On Tue, Aug 9, 2016 at 9:24 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Hi Thomas,
> I removed the KafkaIO() call, and replaced it with TextIO() reading data
> records from file system.
> *Works perfect* :-( Not sure to be happy or sad...all this time I proved
> kafka itself was not sending duplicate records.
> But it seems like KafkaIO() has the brain of its own.
>
> Bottom-line: The difference is KafkaIO().....Its probably intermittently
> sending duplicates which I could not catch during my testing.
>
> Anyone can suggest a way to prevent KafkaIO() from re-sending to
> processElement() pls?
> Thanks.
>
> p.apply(TextIO.Read.from("/tmp/10m1x1K.dat"))
> .apply("PseudoLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Tuesday, August 9, 2016 1:22 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> ConcurrentHashMaps can be interacted with in a way that does not preserve
> the intended semantics. If you are using exclusively atomic mutation
> operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can
> ensure that the mutation semantics are obtained; however, using a
> ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors.
> Otherwise, ConcurrentMaps provide happens-before and visibility guarantees
> only.
>
> For the second question, this is mainly about interacting with mutable
> per-element state - if you interact with, for example, mutable instance
> fields that have a base and a current state, the base state must be reset
> per-element. It doesn't sound like this is your problem.
>
> On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> I spent time to digest all of this. I think I understand it to a good
> extent.
> The only hang up I still have is controlling the execution trajectory with
> persisting state which you say its not guaranteed in Beam.
> Have some further questions* Q* below & appreciate your valuable time to
> respond to them. I reiterated your statements in " " for quick reference
> above them.
>
> "We do not encourage sharing objects between DoFn instances, and any
> shared state must be accessed in a thread-safe manner, and modifications to
> shared state should be idempotent, as otherwise retries and speculative
> execution may cause that state to be inconsistent."
> *Q*: I persisted state in (single instance) Redis. I got varying result
> at each run.
> I then replaced Redis with java (static) ConcurrentHashMaps which are
> automatically thread safe. Interesting enough, the very first run after
> this change produced precise result & I thought I GOT IT! Re-run, and I got
> varying results again till this moment I am typing this email. How would
> you suggest to "any shared state must be accessed in a thread-safe manner"
> different than using Concurrent HashMaps?
>
>
> "A DoFn will be reused for multiple elements across a single bundle, and
> may be reused across multiple bundles - if you require the DoFn to be
> "fresh" per element, it should perform any required setup at the start of
> the ProcessElement method."
> *Q*: What do you suggest to "it should perform any required setup at the
> start of the ProcessElement method."?
> I can think of persisting the DoFn Obj's HashCode at the Object class
> level (every-time ProcessElement is invoked)  & compare it later on for
> uniqueness with Object's equals(Obj). It gets a little hairy when
> "parallelism" manifests in execution I know.
> I appreciate your suggestions.
>
>
> Thanks+have a great day.
> Amir
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>

Re: ************* Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Thomas,I removed the KafkaIO() call, and replaced it with TextIO() reading data records from file system.Works perfect :-( Not sure to be happy or sad...all this time I proved kafka itself was not sending duplicate records.But it seems like KafkaIO() has the brain of its own.
Bottom-line: The difference is KafkaIO().....Its probably intermittently sending duplicates which I could not catch during my testing.
Anyone can suggest a way to prevent KafkaIO() from re-sending to processElement() pls?Thanks.
p.apply(TextIO.Read.from("/tmp/10m1x1K.dat"))  .apply("PseudoLRDoFn", ParDo.of(new DoFn<String, String>() {
      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Tuesday, August 9, 2016 1:22 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
ConcurrentHashMaps can be interacted with in a way that does not preserve the intended semantics. If you are using exclusively atomic mutation operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can ensure that the mutation semantics are obtained; however, using a ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors. Otherwise, ConcurrentMaps provide happens-before and visibility guarantees only.
For the second question, this is mainly about interacting with mutable per-element state - if you interact with, for example, mutable instance fields that have a base and a current state, the base state must be reset per-element. It doesn't sound like this is your problem.
On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,I spent time to digest all of this. I think I understand it to a good extent. The only hang up I still have is controlling the execution trajectory with persisting state which you say its not guaranteed in Beam.Have some further questions Q below & appreciate your valuable time to respond to them. I reiterated your statements in " " for quick reference above them.
"We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent." Q: I persisted state in (single instance) Redis. I got varying result at each run. I then replaced Redis with java (static) ConcurrentHashMaps which are automatically thread safe. Interesting enough, the very first run after this change produced precise result & I thought I GOT IT! Re-run, and I got varying results again till this moment I am typing this email. How would you suggest to "any shared state must be accessed in a thread-safe manner" different than using Concurrent HashMaps?

"A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method."
Q: What do you suggest to "it should perform any required setup at the start of the ProcessElement method."?I can think of persisting the DoFn Obj's HashCode at the Object class level (every-time ProcessElement is invoked)  & compare it later on for uniqueness with Object's equals(Obj). It gets a little hairy when "parallelism" manifests in execution I know.I appreciate your suggestions.

Thanks+have a great day.Amir

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



   

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,I monitored what gets sent from Kafka to a Kafka Consumer instance. The data is out of sequence comparing to whats in the source file I read from line by line and send it to Kafka.Here is how I created the topic: one partition only so EVERYTHING goes in that 1 partition & according to Kafka docs, it guaranteed ordered as it was sent.
./kafka-topics.sh --zookeeper kafhost:2181 --create --topic lrroad --partitions 1 --replication-factor 1

This is a good link:https://kafka.apache.org/08/introduction.html
Pls see the following statement. It translates to me to create a topic with only 1 partition like you see the command line above.Is there anything else I should do besides creating the topic with 1 partition to get total order in that 1 partition. At the moment, I dont care about parallelism.
"Kafka only provides a total order over messages within a partition. This combined with the ability to partition data by key is sufficient for the vast majority of applications. However, if you require a total order over messages this can be achieved with a topic that has only one partition, though this will mean only one consumer process."
And I have only one consumer: my FlinkRunner app running in a 2-nodes Flink Cluster.Thanks for your response Raghu...Cheers
      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
Cc: Thomas Groh <tg...@google.com>
 Sent: Wednesday, August 17, 2016 6:23 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
Thanks Amir for digging deeper into this.
On Wed, Aug 17, 2016 at 5:21 PM, amir bahmanyari <am...@yahoo.com> wrote:

Ok. I see its Kafka that doesn't send records to KafkaIO() in the same exact order as its being sent to it.I proved it with a stand alone consumer several times and it shows.As per Kafka docs suggestions, I recreated a new topic with number of partitions=1 which Kafka docs say that guarantees exact order in a single partition.It still doesn't send them in the right order even with the number of replications being just 1 i.e. no parallelism at all.

I would be very surprised if this is the case with Kafka. Are you publishing in single thread? 



   

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Raghu Angadi <ra...@google.com>.
Thanks Amir for digging deeper into this.

On Wed, Aug 17, 2016 at 5:21 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Ok. I see its Kafka that doesn't send records to KafkaIO() in the same
> exact order as its being sent to it.
> I proved it with a stand alone consumer several times and it shows.
> As per Kafka docs suggestions, I recreated a new topic with number of
> partitions=1 which Kafka docs say that guarantees exact order in a single
> partition.
> It still doesn't send them in the right order even with the number of
> replications being just 1 i.e. no parallelism at all.
>

I would be very surprised if this is the case with Kafka. Are you
publishing in single thread?

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Sorry, I know this is not the Kafka forum. Sorry about that. But I thought I would share this as the root cause of the inconsistency I have described below.
Ok. I see its Kafka that doesn't send records to KafkaIO() in the same exact order as its being sent to it.I proved it with a stand alone consumer several times and it shows.As per Kafka docs suggestions, I recreated a new topic with number of partitions=1 which Kafka docs say that guarantees exact order in a single partition.It still doesn't send them in the right order even with the number of replications being just 1 i.e. no parallelism at all.
My records MUST come in order as they have been sent. So, at the moment seems like KafkaIO() is not an option due to the above Kafka sentiments.I know TextIO(), the poor man's way, works. Proven/tested fact. 
Any other suggestions is appreciated.Thanks+regards,Amir-

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Tuesday, August 9, 2016 1:22 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
ConcurrentHashMaps can be interacted with in a way that does not preserve the intended semantics. If you are using exclusively atomic mutation operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can ensure that the mutation semantics are obtained; however, using a ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors. Otherwise, ConcurrentMaps provide happens-before and visibility guarantees only.
For the second question, this is mainly about interacting with mutable per-element state - if you interact with, for example, mutable instance fields that have a base and a current state, the base state must be reset per-element. It doesn't sound like this is your problem.
On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,I spent time to digest all of this. I think I understand it to a good extent. The only hang up I still have is controlling the execution trajectory with persisting state which you say its not guaranteed in Beam.Have some further questions Q below & appreciate your valuable time to respond to them. I reiterated your statements in " " for quick reference above them.
"We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent." Q: I persisted state in (single instance) Redis. I got varying result at each run. I then replaced Redis with java (static) ConcurrentHashMaps which are automatically thread safe. Interesting enough, the very first run after this change produced precise result & I thought I GOT IT! Re-run, and I got varying results again till this moment I am typing this email. How would you suggest to "any shared state must be accessed in a thread-safe manner" different than using Concurrent HashMaps?

"A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method."
Q: What do you suggest to "it should perform any required setup at the start of the ProcessElement method."?I can think of persisting the DoFn Obj's HashCode at the Object class level (every-time ProcessElement is invoked)  & compare it later on for uniqueness with Object's equals(Obj). It gets a little hairy when "parallelism" manifests in execution I know.I appreciate your suggestions.

Thanks+have a great day.Amir

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



   

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks very much Thomas.I did all the "concurrency" tricks I had in my sleeves plus the suggestions below.Same.Need to do more research perhaps.Any link you can point me to that discusses "Concurrency" in Beam pls?Have a great eve.Amir-


      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Tuesday, August 9, 2016 1:22 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
ConcurrentHashMaps can be interacted with in a way that does not preserve the intended semantics. If you are using exclusively atomic mutation operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can ensure that the mutation semantics are obtained; however, using a ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors. Otherwise, ConcurrentMaps provide happens-before and visibility guarantees only.
For the second question, this is mainly about interacting with mutable per-element state - if you interact with, for example, mutable instance fields that have a base and a current state, the base state must be reset per-element. It doesn't sound like this is your problem.
On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,I spent time to digest all of this. I think I understand it to a good extent. The only hang up I still have is controlling the execution trajectory with persisting state which you say its not guaranteed in Beam.Have some further questions Q below & appreciate your valuable time to respond to them. I reiterated your statements in " " for quick reference above them.
"We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent." Q: I persisted state in (single instance) Redis. I got varying result at each run. I then replaced Redis with java (static) ConcurrentHashMaps which are automatically thread safe. Interesting enough, the very first run after this change produced precise result & I thought I GOT IT! Re-run, and I got varying results again till this moment I am typing this email. How would you suggest to "any shared state must be accessed in a thread-safe manner" different than using Concurrent HashMaps?

"A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method."
Q: What do you suggest to "it should perform any required setup at the start of the ProcessElement method."?I can think of persisting the DoFn Obj's HashCode at the Object class level (every-time ProcessElement is invoked)  & compare it later on for uniqueness with Object's equals(Obj). It gets a little hairy when "parallelism" manifests in execution I know.I appreciate your suggestions.

Thanks+have a great day.Amir

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



   



   

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Thomas Groh <tg...@google.com>.
ConcurrentHashMaps can be interacted with in a way that does not preserve
the intended semantics. If you are using exclusively atomic mutation
operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can
ensure that the mutation semantics are obtained; however, using a
ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors.
Otherwise, ConcurrentMaps provide happens-before and visibility guarantees
only.

For the second question, this is mainly about interacting with mutable
per-element state - if you interact with, for example, mutable instance
fields that have a base and a current state, the base state must be reset
per-element. It doesn't sound like this is your problem.

On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Thomas,
> I spent time to digest all of this. I think I understand it to a good
> extent.
> The only hang up I still have is controlling the execution trajectory with
> persisting state which you say its not guaranteed in Beam.
> Have some further questions* Q* below & appreciate your valuable time to
> respond to them. I reiterated your statements in " " for quick reference
> above them.
>
> "We do not encourage sharing objects between DoFn instances, and any
> shared state must be accessed in a thread-safe manner, and modifications to
> shared state should be idempotent, as otherwise retries and speculative
> execution may cause that state to be inconsistent."
> *Q*: I persisted state in (single instance) Redis. I got varying result
> at each run.
> I then replaced Redis with java (static) ConcurrentHashMaps which are
> automatically thread safe. Interesting enough, the very first run after
> this change produced precise result & I thought I GOT IT! Re-run, and I got
> varying results again till this moment I am typing this email. How would
> you suggest to "any shared state must be accessed in a thread-safe manner"
> different than using Concurrent HashMaps?
>
>
> "A DoFn will be reused for multiple elements across a single bundle, and
> may be reused across multiple bundles - if you require the DoFn to be
> "fresh" per element, it should perform any required setup at the start of
> the ProcessElement method."
> *Q*: What do you suggest to "it should perform any required setup at the
> start of the ProcessElement method."?
> I can think of persisting the DoFn Obj's HashCode at the Object class
> level (every-time ProcessElement is invoked)  & compare it later on for
> uniqueness with Object's equals(Obj). It gets a little hairy when
> "parallelism" manifests in execution I know.
> I appreciate your suggestions.
>
>
> Thanks+have a great day.
> Amir
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Thomas,I spent time to digest all of this. I think I understand it to a good extent. The only hang up I still have is controlling the execution trajectory with persisting state which you say its not guaranteed in Beam.Have some further questions Q below & appreciate your valuable time to respond to them. I reiterated your statements in " " for quick reference above them.
"We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent." Q: I persisted state in (single instance) Redis. I got varying result at each run. I then replaced Redis with java (static) ConcurrentHashMaps which are automatically thread safe. Interesting enough, the very first run after this change produced precise result & I thought I GOT IT! Re-run, and I got varying results again till this moment I am typing this email. How would you suggest to "any shared state must be accessed in a thread-safe manner" different than using Concurrent HashMaps?

"A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method."
Q: What do you suggest to "it should perform any required setup at the start of the ProcessElement method."?I can think of persisting the DoFn Obj's HashCode at the Object class level (every-time ProcessElement is invoked)  & compare it later on for uniqueness with Object's equals(Obj). It gets a little hairy when "parallelism" manifests in execution I know.I appreciate your suggestions.

Thanks+have a great day.Amir

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 1:44 PM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
There's no way to guarantee that exactly one record is processed at a time. This is part of the design of ParDo to work efficiently across multiple processes and machines[1], where multiple instances of a DoFn must exist in order for progress to be made in a timely fashion. This includes processing the same element across multiple machines at the same time, with only one of the results being available in the output PCollection, as well as retries of failed elements.
A runner is required to interact with a DoFn instance in a single-threaded manner - however, it is permitted to have multiple different DoFn instances active within a single process and across processes at any given time (for the same reasons as above). There's no support in the Beam model to restrict this type of execution. We do not encourage sharing objects between DoFn instances, and any shared state must be accessed in a thread-safe manner, and modifications to shared state should be idempotent, as otherwise retries and speculative execution may cause that state to be inconsistent. A DoFn will be reused for multiple elements across a single bundle, and may be reused across multiple bundles - if you require the DoFn to be "fresh" per element, it should perform any required setup at the start of the ProcessElement method.
The best that can be done if it is absolutely required to restrict processing to a single element at a time would be to group all of the elements to a single key. Note that this will not solve the problem in all cases, as a runner is permitted to execute the group of elements multiple times so long as it only takes one completed bundle as the result, and additionally this removes the ability of the runner to balance work and introduces a performance bottleneck. To do so, you would key the inputs to a single static key and apply a GroupByKey, running the processing method on the output Iterable produced by the GroupByKey (directly; expanding the input iterable in a separate PCollection allows a runner to rebalance the elements, which will reintroduce parallelism)`.
[1] https://github.com/apache/ incubator-beam/blob/master/ sdks/java/core/src/main/java/ org/apache/beam/sdk/ transforms/ParDo.java#L360
On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {.//I expect one record at a time to one object here------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to k afkarecords)?==>>No duplicates from Kafka.------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
------------------------------ ------------------------------ ------------------------------ ------------------------------ -----------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org ; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
  
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



   



  

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Thomas Groh <tg...@google.com>.
There's no way to guarantee that exactly one record is processed at a time.
This is part of the design of ParDo to work efficiently across multiple
processes and machines[1], where multiple instances of a DoFn must exist in
order for progress to be made in a timely fashion. This includes processing
the same element across multiple machines at the same time, with only one
of the results being available in the output PCollection, as well as
retries of failed elements.

A runner is required to interact with a DoFn instance in a single-threaded
manner - however, it is permitted to have multiple different DoFn instances
active within a single process and across processes at any given time (for
the same reasons as above). There's no support in the Beam model to
restrict this type of execution. We do not encourage sharing objects
between DoFn instances, and any shared state must be accessed in a
thread-safe manner, and modifications to shared state should be idempotent,
as otherwise retries and speculative execution may cause that state to be
inconsistent. A DoFn will be reused for multiple elements across a single
bundle, and may be reused across multiple bundles - if you require the DoFn
to be "fresh" per element, it should perform any required setup at the
start of the ProcessElement method.

The best that can be done if it is absolutely required to restrict
processing to a single element at a time would be to group all of the
elements to a single key. Note that this will not solve the problem in all
cases, as a runner is permitted to execute the group of elements multiple
times so long as it only takes one completed bundle as the result, and
additionally this removes the ability of the runner to balance work and
introduces a performance bottleneck. To do so, you would key the inputs to
a single static key and apply a GroupByKey, running the processing method
on the output Iterable produced by the GroupByKey (directly; expanding the
input iterable in a separate PCollection allows a runner to rebalance the
elements, which will reintroduce parallelism)`.

[1] https://github.com/apache/incubator-beam/blob/master/
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360

On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers("kafkahost:9092").withTopics(topics).
> withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(
> Values.<String>create()).apply(Window.<String>into(
> FixedWindows.of(Duration.standardMinutes(1)))
>          .triggering(AfterWatermark.pastEndOfWindow()).
> withAllowedLateness(Duration.ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Thomas,Thanks so much for your response. Here are answers to your questions.You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the records. I have confirmed that I dont get duplicates from Kafka. However,for some reason, certain parts of my code execute beyond the actual number of expected number of records, and subsequently produce extra resulting data. I tried playing with the Triggering. Stretching the window interval, DiscardingFiredPanes etc. all kinds of modes.Same.  How can I guarantee that one record at a time executes in one unique instance of the inner class object?I have all the shared objects synchronized and am using Java concurrent hashmaps. How can I guarantee synchronized operations amongst "parallel pipelines"? Analogous to multiple threads accessing a shared object and trying to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics). withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create()).apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))           .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)     .discardingFiredPanes());               kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new DoFn<String, String>() {.//I expect one record at a time to one object here-----------------------------------------------------------------------------------------------------------------------------------------------
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?==>>No duplicates from Kafka.-----------------------------------------------------------------------------------------------------------------------------------------------Additionally, I'm not sure what you mean by "executes till a record lands on method"==>>Sorry for my confusing statement. Like I mentioned above, I expect each record coming from Kafka gets assigned to one instance of the inner class and therefore one instance of the pipeline executed it in parallel with others executing their own unique records.
-----------------------------------------------------------------------------------------------------------------------------------------------
Additionally additionally, is this reproducible if you execute with the DirectRunner? ==>>I have not tried DirectRunner. Should I? 
Thanks so much Thomas.

      From: Thomas Groh <tg...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, August 8, 2016 11:43 AM
 Subject: Re: Is Beam pipeline runtime behavior inconsistent?
   
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on method"
Additionally additionally, is this reproducible if you execute with the DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()). apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))           .triggering(AfterWatermark. pastEndOfWindow()). withAllowedLateness(Duration. ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class



  

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by Thomas Groh <tg...@google.com>.
Just to make sure I understand the problem:

You have a specific collection of records stored in Kafka. You run your
pipeline, and observe duplicate elements. Is that accurate?

Have you confirmed that you're getting duplicate records via other library
transforms (such as applying Count.globally() to kafkarecords)?

Additionally, I'm not sure what you mean by "executes till a record lands
on method"

Additionally additionally, is this reproducible if you execute with the
DirectRunner?


On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner(FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers("kafkahost:9092").withTopics(topics).
> withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(
> Values.<String>create()).apply(Window.<String>into(
> FixedWindows.of(Duration.standardMinutes(1)))
>          .triggering(AfterWatermark.pastEndOfWindow()).
> withAllowedLateness(Duration.ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>

Re: Is Beam pipeline runtime behavior inconsistent?

Posted by amir bahmanyari <am...@yahoo.com>.
Hi colleagues,Can someone help to solve this mystery pls?Thanks+regards,Amir-

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Sunday, August 7, 2016 11:44 PM
 Subject: Is Beam pipeline runtime behavior inconsistent?
   
Hi Colleagues,I refrained from posting this email before completing thorough testing.I think I did.My core code works perfect & produces the expect result every single time without wrapping it with Beam KafkaIO to receive the data.Without KafkaIO, it receives the records from a flat data file. I repeated it and it always produced the right result.With including a Beam KarkaIO and embedding exact same code in a anonymous class running Beam pipelines, I get a different result every time I rerun it.Below is the snippet from where KafkaIO executes till a record lands on method.Kafka sends precise number of records. No duplicates. all good.While executing in Beam, when the records are finished & I expect a correct result, it always produces something different. Different in different runs.I appreciate shedding light on this issue.  And thanks for your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); // Set the Streaming engine as FlinkRunner options.setRunner(FlinkPipelineRunner.class); // This is a Streaming process (as opposed to Batch=false) options.setStreaming(true); //Create the DAG pipeline for parallel processing of independent LR records Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as "lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics). withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create()).apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))           .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)     .accumulatingFiredPanes());               kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new DoFn<String, String>() {                        
                        public void processElement(ProcessContext ctx) throws Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class