You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/05/26 23:19:11 UTC

Beam Flink vs Beam Spark Benchmarking

Hi Colleagues,I have implemented the Java version of the MIT's Linear Road algorithm as a Beam app.I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.Receives tuples from Kafka, executes the LR algorithm, and produces the correct results.I would like to repeat the same in a Spark cluster.I am assuming that, other than changing the type of the Runner (Flink vs Spark) at runtime, I should not make any code changes.Is that the right assumption based on what Beam is promising regarding unifying of the underlying streaming engines?
The real question is: What should I take into consideration if I want to Benchmark Flink vs Spark by executing my same Beam LR app in both engines?How would you approach the benchmarking process? What would you be looking for to compare? etc.Thanks so much for your valuable time.Amir-

Re: Beam Flink vs Beam Spark Benchmarking

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you could, as JB suggested, replace it with something like this:

PipelineOptions options
= PipelineOptionsFactory.fromArgs(args).withValidation().create();

and then give "--runner=FlinkPipelineRunner --streaming=true" to make the
runner configurable via command-line parameters.

I think the problem with Kafka in the batch runner can be solved by putting
the Kafka jars into the lib folder of the Flink cluster nodes and
restarting the cluster.

--
Aljoscha

On Sat, 28 May 2016 at 01:08 Kaniska Gmail <ka...@gmail.com> wrote:

> Did you try FlinkKafkaConsumer and FlinkKafkaProducer  ?
>
> Flink+Kafka+Beam working very nicely for streaming millions of events ...
> for me.
>
> Also make sure you are packaging latest beam source code.
>
> Sent from my iPhone
>
> On May 27, 2016, at 2:48 PM, amir bahmanyari <am...@yahoo.com> wrote:
>
> Hi JB
> I replaced, in my code :
> FlinkPipelineOptions options =
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> with:
> PipelineOptions options = PipelineOptionsFactory.create();
>
> The compiler complained about setStreaming(true) not being supported. So I
> commented out options.setStreaming(true);
> Compiled fine.
> Then run it.
> It threw at p.run():
>
> ...Running thread  threw:  java.lang.UnsupportedOperationException: The
> transform Read(*UnboundedKafkaSource) is currently not supported.*
>         at
> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitPrimitiveTransform(FlinkBatchPipelineTranslator.java:100)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>         at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>         at
> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>         at
> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.translate(FlinkBatchPipelineTranslator.java:56)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)
>
>
> So I added *.withMaxNumRecords(100)* to the KafkaIO call.
> Restarted Flink Cluster. Rerun the app.
> Got this exception at p.run() now. Have a wonderful weekend.
>
>
> ...Running thread  threw:  java.lang.RuntimeException: Pipeline execution
> failed
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:117)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Failed to submit job
> 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:114)
>         ... 14 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job 6d197ab3c635428c287be6a2dd8f6d6e
> (readfromkafka2-abahman-0527214049)
>         at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Could not create input splits from Source.
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>         at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>         ... 23 more
> Caused by: java.io.IOException: Could not create input splits from Source.
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:113)
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44)
>         at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>         ... 25 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
> org.apache.kafka.common.serialization.ByteArrayDeserializer for
> configuration value.deserializer: Class
> org.apache.kafka.common.serialization.ByteArrayDeserializer could not be
> found.
>         at
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
>         at
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)
>         at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)
>         at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
>         at
> org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>         at
> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:168)
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:105)
>         ... 27 more
>
>
>
> ------------------------------
> *From:* Jean-Baptiste Onofré <jb...@nanthrax.net>
> *To:* user@beam.incubator.apache.org
> *Sent:* Friday, May 27, 2016 1:33 AM
> *Subject:* Re: Beam Flink vs Beam Spark Benchmarking
>
> Hi Ismaël,
>
> as discussed together, clearly, the pipeline code should not use a
> runner specific pipeline options object, in order to be runner agnostic.
>
> Something like:
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.as(SparkPipelineOptions.class)
>
> should not be used.
> It's better to use something like:
>
>   PipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>
>
> However, I think we may improve a bit the factory.
>
> Regards
> JB
>
> On 05/27/2016 10:22 AM, Ismaël Mejía wrote:
> > ​
> > I passed last week running tests on multiple runners, and theoretically
> > you should not change many things, however you must take care of not
> > mixing runner specific dependencies while you create your project (e.g.
> > you don't want to mix specific classes like FlinkPipelineOptions or
> > SparkPipelineOptions in your code).
> >
> > About specific good practices of how to benchmark things this is a more
> > tricky subject, e.g. you must be sure that both runners are using at
> > least similar parallelism levels. Of course there are many dimensions in
> > benchmarking and in particular in this space, the real question you have
> > to start with is what do you want to benchmark (throughput, resource
> > utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
> > then try to create an scenario that you can reproduce where you expect a
> > similar behaviour among runners.
> >
> > But one thing is clear, you have to expect some differences since the
> > internal model of each runner is different as well as their maturity
> > level (at least at this point).
> >
> > Ismaël
> >
> >
> > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> > <ma...@yahoo.com>> wrote:
> >
> >    Hi Colleagues,
> >    I have implemented the Java version of the MIT's Linear Road
> >    algorithm as a Beam app.
> >    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> >    Receives tuples from Kafka, executes the LR algorithm, and produces
> >    the correct results.
> >    I would like to repeat the same in a Spark cluster.
> >    I am assuming that, other than changing the type of the Runner
> >    (Flink vs Spark) at runtime, I should not make any code changes.
> >    Is that the right assumption based on what Beam is promising
> >    regarding unifying of the underlying streaming engines?
> >
> >    The real question is: What should I take into consideration if I
> >    want to Benchmark Flink vs Spark by executing my same Beam LR app in
> >    both engines?
> >    How would you approach the benchmarking process? What would you be
> >    looking for to compare? etc.
> >    Thanks so much for your valuable time.
> >    Amir-
> >
> >
> >
> > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> > <ma...@yahoo.com>> wrote:
>
> >
> >    Hi Colleagues,
> >    I have implemented the Java version of the MIT's Linear Road
> >    algorithm as a Beam app.
> >    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> >    Receives tuples from Kafka, executes the LR algorithm, and produces
> >    the correct results.
> >    I would like to repeat the same in a Spark cluster.
> >    I am assuming that, other than changing the type of the Runner
> >    (Flink vs Spark) at runtime, I should not make any code changes.
> >    Is that the right assumption based on what Beam is promising
> >    regarding unifying of the underlying streaming engines?
> >
> >    The real question is: What should I take into consideration if I
> >    want to Benchmark Flink vs Spark by executing my same Beam LR app in
> >    both engines?
> >    How would you approach the benchmarking process? What would you be
> >    looking for to compare? etc.
> >    Thanks so much for your valuable time.
> >    Amir-
>
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>

Re: Beam Flink vs Beam Spark Benchmarking

Posted by Kaniska Gmail <ka...@gmail.com>.
Did you try FlinkKafkaConsumer and FlinkKafkaProducer  ?

Flink+Kafka+Beam working very nicely for streaming millions of events ... for me.

Also make sure you are packaging latest beam source code.

Sent from my iPhone

> On May 27, 2016, at 2:48 PM, amir bahmanyari <am...@yahoo.com> wrote:
> 
> Hi JB
> I replaced, in my code :
> FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> with:
> PipelineOptions options = PipelineOptionsFactory.create();
> 
> The compiler complained about setStreaming(true) not being supported. So I commented out options.setStreaming(true);
> Compiled fine.
> Then run it.
> It threw at p.run():
> 
> ...Running thread  threw:  java.lang.UnsupportedOperationException: The transform Read(UnboundedKafkaSource) is currently not supported.
>         at org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitPrimitiveTransform(FlinkBatchPipelineTranslator.java:100)
>         at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
>         at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>         at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>         at org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>         at org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.translate(FlinkBatchPipelineTranslator.java:56)
>         at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)
> 
> 
> So I added .withMaxNumRecords(100) to the KafkaIO call.
> Restarted Flink Cluster. Rerun the app.
> Got this exception at p.run() now. Have a wonderful weekend.
> 
> 
> ...Running thread  threw:  java.lang.RuntimeException: Pipeline execution failed
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:117)
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049)
>         at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>         at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)
>         at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:114)
>         ... 14 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049)
>         at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not create input splits from Source.
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>         at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>         ... 23 more
> Caused by: java.io.IOException: Could not create input splits from Source.
>         at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:113)
>         at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44)
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>         ... 25 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration value.deserializer: Class org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
>         at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
>         at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)
>         at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)
>         at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
>         at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
>         at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)
>         at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>         at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>         at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:168)
>         at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:105)
>         ... 27 more
> 
> 
> 
> From: Jean-Baptiste Onofré <jb...@nanthrax.net>
> To: user@beam.incubator.apache.org 
> Sent: Friday, May 27, 2016 1:33 AM
> Subject: Re: Beam Flink vs Beam Spark Benchmarking
> 
> Hi Ismaël,
> 
> as discussed together, clearly, the pipeline code should not use a 
> runner specific pipeline options object, in order to be runner agnostic.
> 
> Something like:
> 
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class)
> 
> should not be used.
> It's better to use something like:
> 
>   PipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation().create();
> 
> 
> However, I think we may improve a bit the factory.
> 
> Regards
> JB
> 
> On 05/27/2016 10:22 AM, Ismaël Mejía wrote:
> > ​
> > I passed last week running tests on multiple runners, and theoretically
> > you should not change many things, however you must take care of not
> > mixing runner specific dependencies while you create your project (e.g.
> > you don't want to mix specific classes like FlinkPipelineOptions or
> > SparkPipelineOptions in your code).
> >
> > About specific good practices of how to benchmark things this is a more
> > tricky subject, e.g. you must be sure that both runners are using at
> > least similar parallelism levels. Of course there are many dimensions in
> > benchmarking and in particular in this space, the real question you have
> > to start with is what do you want to benchmark (throughput, resource
> > utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
> > then try to create an scenario that you can reproduce where you expect a
> > similar behaviour among runners.
> >
> > But one thing is clear, you have to expect some differences since the
> > internal model of each runner is different as well as their maturity
> > level (at least at this point).
> >
> > Ismaël
> >
> >
> > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> > <ma...@yahoo.com>> wrote:
> >
> >    Hi Colleagues,
> >    I have implemented the Java version of the MIT's Linear Road
> >    algorithm as a Beam app.
> >    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> >    Receives tuples from Kafka, executes the LR algorithm, and produces
> >    the correct results.
> >    I would like to repeat the same in a Spark cluster.
> >    I am assuming that, other than changing the type of the Runner
> >    (Flink vs Spark) at runtime, I should not make any code changes.
> >    Is that the right assumption based on what Beam is promising
> >    regarding unifying of the underlying streaming engines?
> >
> >    The real question is: What should I take into consideration if I
> >    want to Benchmark Flink vs Spark by executing my same Beam LR app in
> >    both engines?
> >    How would you approach the benchmarking process? What would you be
> >    looking for to compare? etc.
> >    Thanks so much for your valuable time.
> >    Amir-
> >
> >
> >
> > On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> > <ma...@yahoo.com>> wrote:
> 
> >
> >    Hi Colleagues,
> >    I have implemented the Java version of the MIT's Linear Road
> >    algorithm as a Beam app.
> >    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> >    Receives tuples from Kafka, executes the LR algorithm, and produces
> >    the correct results.
> >    I would like to repeat the same in a Spark cluster.
> >    I am assuming that, other than changing the type of the Runner
> >    (Flink vs Spark) at runtime, I should not make any code changes.
> >    Is that the right assumption based on what Beam is promising
> >    regarding unifying of the underlying streaming engines?
> >
> >    The real question is: What should I take into consideration if I
> >    want to Benchmark Flink vs Spark by executing my same Beam LR app in
> >    both engines?
> >    How would you approach the benchmarking process? What would you be
> >    looking for to compare? etc.
> >    Thanks so much for your valuable time.
> >    Amir-
> 
> >
> >
> 
> -- 
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 
> 
> 

Re: Beam Flink vs Beam Spark Benchmarking

Posted by amir bahmanyari <am...@yahoo.com>.
Hi JBI replaced, in my code :FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
with:PipelineOptions options = PipelineOptionsFactory.create();

The compiler complained about setStreaming(true) not being supported. So I commented out options.setStreaming(true);Compiled fine.Then run it.It threw at p.run():
...Running thread  threw:  java.lang.UnsupportedOperationException: The transform Read(UnboundedKafkaSource) is currently not supported.        at org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitPrimitiveTransform(FlinkBatchPipelineTranslator.java:100)        at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)        at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)        at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)        at org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)        at org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.translate(FlinkBatchPipelineTranslator.java:56)        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)

So I added .withMaxNumRecords(100) to the KafkaIO call.Restarted Flink Cluster. Rerun the app.Got this exception at p.run() now. Have a wonderful weekend.


...Running thread  threw:  java.lang.RuntimeException: Pipeline execution failed        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:117)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:220)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049)        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at org.apache.flink.client.program.Client.runBlocking(Client.java:315)        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:114)        ... 14 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 6d197ab3c635428c287be6a2dd8f6d6e (readfromkafka2-abahman-0527214049)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at akka.actor.ActorCell.invoke(ActorCell.scala:487)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at akka.dispatch.Mailbox.run(Mailbox.scala:221)        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not create input splits from Source.        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)        ... 23 moreCaused by: java.io.IOException: Could not create input splits from Source.        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:113)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44)        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration value.deserializer: Class org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.        at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:168)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:105)        ... 27 more


      From: Jean-Baptiste Onofré <jb...@nanthrax.net>
 To: user@beam.incubator.apache.org 
 Sent: Friday, May 27, 2016 1:33 AM
 Subject: Re: Beam Flink vs Beam Spark Benchmarking
   
Hi Ismaël,

as discussed together, clearly, the pipeline code should not use a 
runner specific pipeline options object, in order to be runner agnostic.

Something like:

SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class)

should not be used.
It's better to use something like:

  PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().create();


However, I think we may improve a bit the factory.

Regards
JB

On 05/27/2016 10:22 AM, Ismaël Mejía wrote:
> ​
> I passed last week running tests on multiple runners, and theoretically
> you should not change many things, however you must take care of not
> mixing runner specific dependencies while you create your project (e.g.
> you don't want to mix specific classes like FlinkPipelineOptions or
> SparkPipelineOptions in your code).
>
> About specific good practices of how to benchmark things this is a more
> tricky subject, e.g. you must be sure that both runners are using at
> least similar parallelism levels. Of course there are many dimensions in
> benchmarking and in particular in this space, the real question you have
> to start with is what do you want to benchmark (throughput, resource
> utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
> then try to create an scenario that you can reproduce where you expect a
> similar behaviour among runners.
>
> But one thing is clear, you have to expect some differences since the
> internal model of each runner is different as well as their maturity
> level (at least at this point).
>
> Ismaël
>
>
> On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>    Hi Colleagues,
>    I have implemented the Java version of the MIT's Linear Road
>    algorithm as a Beam app.
>    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
>    Receives tuples from Kafka, executes the LR algorithm, and produces
>    the correct results.
>    I would like to repeat the same in a Spark cluster.
>    I am assuming that, other than changing the type of the Runner
>    (Flink vs Spark) at runtime, I should not make any code changes.
>    Is that the right assumption based on what Beam is promising
>    regarding unifying of the underlying streaming engines?
>
>    The real question is: What should I take into consideration if I
>    want to Benchmark Flink vs Spark by executing my same Beam LR app in
>    both engines?
>    How would you approach the benchmarking process? What would you be
>    looking for to compare? etc.
>    Thanks so much for your valuable time.
>    Amir-
>
>
>
> On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>    Hi Colleagues,
>    I have implemented the Java version of the MIT's Linear Road
>    algorithm as a Beam app.
>    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
>    Receives tuples from Kafka, executes the LR algorithm, and produces
>    the correct results.
>    I would like to repeat the same in a Spark cluster.
>    I am assuming that, other than changing the type of the Runner
>    (Flink vs Spark) at runtime, I should not make any code changes.
>    Is that the right assumption based on what Beam is promising
>    regarding unifying of the underlying streaming engines?
>
>    The real question is: What should I take into consideration if I
>    want to Benchmark Flink vs Spark by executing my same Beam LR app in
>    both engines?
>    How would you approach the benchmarking process? What would you be
>    looking for to compare? etc.
>    Thanks so much for your valuable time.
>    Amir-
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


  

Re: Beam Flink vs Beam Spark Benchmarking

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks gents for your guidelines. Appreciate the time.These are my first shot at the Benchmarking goals:            • What worked?
            • What did not work? 
            • What was setup and usability like? 
            • What issues did we run into? 
            • How did we resolve these issues? 
            • Were we able to get the system operational? 
            • Were we able to get the results you wanted?
            • Comparative Performance Analysis?

I am still researching the parameters I need to take into account for the last bullet: Comparative Performance AnalysisHave a great weekend.      From: Jean-Baptiste Onofré <jb...@nanthrax.net>
 To: user@beam.incubator.apache.org 
 Sent: Friday, May 27, 2016 1:33 AM
 Subject: Re: Beam Flink vs Beam Spark Benchmarking
   
Hi Ismaël,

as discussed together, clearly, the pipeline code should not use a 
runner specific pipeline options object, in order to be runner agnostic.

Something like:

SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class)

should not be used.
It's better to use something like:

  PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().create();


However, I think we may improve a bit the factory.

Regards
JB

On 05/27/2016 10:22 AM, Ismaël Mejía wrote:
> ​
> I passed last week running tests on multiple runners, and theoretically
> you should not change many things, however you must take care of not
> mixing runner specific dependencies while you create your project (e.g.
> you don't want to mix specific classes like FlinkPipelineOptions or
> SparkPipelineOptions in your code).
>
> About specific good practices of how to benchmark things this is a more
> tricky subject, e.g. you must be sure that both runners are using at
> least similar parallelism levels. Of course there are many dimensions in
> benchmarking and in particular in this space, the real question you have
> to start with is what do you want to benchmark (throughput, resource
> utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
> then try to create an scenario that you can reproduce where you expect a
> similar behaviour among runners.
>
> But one thing is clear, you have to expect some differences since the
> internal model of each runner is different as well as their maturity
> level (at least at this point).
>
> Ismaël
>
>
> On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>    Hi Colleagues,
>    I have implemented the Java version of the MIT's Linear Road
>    algorithm as a Beam app.
>    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
>    Receives tuples from Kafka, executes the LR algorithm, and produces
>    the correct results.
>    I would like to repeat the same in a Spark cluster.
>    I am assuming that, other than changing the type of the Runner
>    (Flink vs Spark) at runtime, I should not make any code changes.
>    Is that the right assumption based on what Beam is promising
>    regarding unifying of the underlying streaming engines?
>
>    The real question is: What should I take into consideration if I
>    want to Benchmark Flink vs Spark by executing my same Beam LR app in
>    both engines?
>    How would you approach the benchmarking process? What would you be
>    looking for to compare? etc.
>    Thanks so much for your valuable time.
>    Amir-
>
>
>
> On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>    Hi Colleagues,
>    I have implemented the Java version of the MIT's Linear Road
>    algorithm as a Beam app.
>    I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
>    Receives tuples from Kafka, executes the LR algorithm, and produces
>    the correct results.
>    I would like to repeat the same in a Spark cluster.
>    I am assuming that, other than changing the type of the Runner
>    (Flink vs Spark) at runtime, I should not make any code changes.
>    Is that the right assumption based on what Beam is promising
>    regarding unifying of the underlying streaming engines?
>
>    The real question is: What should I take into consideration if I
>    want to Benchmark Flink vs Spark by executing my same Beam LR app in
>    both engines?
>    How would you approach the benchmarking process? What would you be
>    looking for to compare? etc.
>    Thanks so much for your valuable time.
>    Amir-
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


  

Re: Beam Flink vs Beam Spark Benchmarking

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Isma�l,

as discussed together, clearly, the pipeline code should not use a 
runner specific pipeline options object, in order to be runner agnostic.

Something like:

SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class)

should not be used.
It's better to use something like:

  PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().create();


However, I think we may improve a bit the factory.

Regards
JB

On 05/27/2016 10:22 AM, Isma�l Mej�a wrote:
> \u200b
> I passed last week running tests on multiple runners, and theoretically
> you should not change many things, however you must take care of not
> mixing runner specific dependencies while you create your project (e.g.
> you don't want to mix specific classes like FlinkPipelineOptions or
> SparkPipelineOptions in your code).
>
> About specific good practices of how to benchmark things this is a more
> tricky subject, e.g. you must be sure that both runners are using at
> least similar parallelism levels. Of course there are many dimensions in
> benchmarking and in particular in this space, the real question you have
> to start with is what do you want to benchmark (throughput, resource
> utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
> then try to create an scenario that you can reproduce where you expect a
> similar behaviour among runners.
>
> But one thing is clear, you have to expect some differences since the
> internal model of each runner is different as well as their maturity
> level (at least at this point).
>
> Isma�l
>
>
> On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>     Hi Colleagues,
>     I have implemented the Java version of the MIT's Linear Road
>     algorithm as a Beam app.
>     I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
>     Receives tuples from Kafka, executes the LR algorithm, and produces
>     the correct results.
>     I would like to repeat the same in a Spark cluster.
>     I am assuming that, other than changing the type of the Runner
>     (Flink vs Spark) at runtime, I should not make any code changes.
>     Is that the right assumption based on what Beam is promising
>     regarding unifying of the underlying streaming engines?
>
>     The real question is: What should I take into consideration if I
>     want to Benchmark Flink vs Spark by executing my same Beam LR app in
>     both engines?
>     How would you approach the benchmarking process? What would you be
>     looking for to compare? etc.
>     Thanks so much for your valuable time.
>     Amir-
>
>
>
> On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
>     Hi Colleagues,
>     I have implemented the Java version of the MIT's Linear Road
>     algorithm as a Beam app.
>     I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
>     Receives tuples from Kafka, executes the LR algorithm, and produces
>     the correct results.
>     I would like to repeat the same in a Spark cluster.
>     I am assuming that, other than changing the type of the Runner
>     (Flink vs Spark) at runtime, I should not make any code changes.
>     Is that the right assumption based on what Beam is promising
>     regarding unifying of the underlying streaming engines?
>
>     The real question is: What should I take into consideration if I
>     want to Benchmark Flink vs Spark by executing my same Beam LR app in
>     both engines?
>     How would you approach the benchmarking process? What would you be
>     looking for to compare? etc.
>     Thanks so much for your valuable time.
>     Amir-
>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Beam Flink vs Beam Spark Benchmarking

Posted by Ismaël Mejía <ie...@gmail.com>.
​
I passed last week running tests on multiple runners, and theoretically you
should not change many things, however you must take care of not mixing
runner specific dependencies while you create your project (e.g. you don't
want to mix specific classes like FlinkPipelineOptions or
SparkPipelineOptions in your code).

About specific good practices of how to benchmark things this is a more
tricky subject, e.g. you must be sure that both runners are using at least
similar parallelism levels. Of course there are many dimensions in
benchmarking and in particular in this space, the real question you have to
start with is what do you want to benchmark (throughput, resource
utilisation, etc) ? Is your pipeline batch only or streaming too ?. And
then try to create an scenario that you can reproduce where you expect a
similar behaviour among runners.

But one thing is clear, you have to expect some differences since the
internal model of each runner is different as well as their maturity level
(at least at this point).

Ismaël


On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Colleagues,
> I have implemented the Java version of the MIT's Linear Road algorithm as
> a Beam app.
> I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> Receives tuples from Kafka, executes the LR algorithm, and produces the
> correct results.
> I would like to repeat the same in a Spark cluster.
> I am assuming that, other than changing the type of the Runner (Flink vs
> Spark) at runtime, I should not make any code changes.
> Is that the right assumption based on what Beam is promising regarding
> unifying of the underlying streaming engines?
>
> The real question is: What should I take into consideration if I want to
> Benchmark Flink vs Spark by executing my same Beam LR app in both engines?
> How would you approach the benchmarking process? What would you be looking
> for to compare? etc.
> Thanks so much for your valuable time.
> Amir-
>


On Fri, May 27, 2016 at 1:19 AM, amir bahmanyari <am...@yahoo.com>
wrote:

> Hi Colleagues,
> I have implemented the Java version of the MIT's Linear Road algorithm as
> a Beam app.
> I sanity tested it in a Flink Cluster (FlinkRunner). Works fine.
> Receives tuples from Kafka, executes the LR algorithm, and produces the
> correct results.
> I would like to repeat the same in a Spark cluster.
> I am assuming that, other than changing the type of the Runner (Flink vs
> Spark) at runtime, I should not make any code changes.
> Is that the right assumption based on what Beam is promising regarding
> unifying of the underlying streaming engines?
>
> The real question is: What should I take into consideration if I want to
> Benchmark Flink vs Spark by executing my same Beam LR app in both engines?
> How would you approach the benchmarking process? What would you be looking
> for to compare? etc.
> Thanks so much for your valuable time.
> Amir-
>