You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/05/02 01:38:16 UTC
Re: KafkaIO Usage & Sample Code
Hi JB,I rebuilt my code with the latest :kafka-0.1.0-incubating-20160501.070733-11.jar
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
Tried without setting withMaxNumRecords():Throws java.lang.IllegalStateException: no evaluator registered for Read(UnboundedKafkaSource) at org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
With setting ithMaxNumRecords(), I see the thread is running, no exceptions like above, waiting for incoming Kafka data, but the method obtaining the data from processElement(ProcessContext ctx) never executes.Therefore, nothing goes into apply(TextIO.Write.to("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP address as getting a connection to it, OK.Everything looks OK at the server side. Doesn't look like its my lucky day.I appreciate any help/feedback/suggetion.Cheers
From: Jean-Baptiste Onofré <jb...@nanthrax.net>
To: user@beam.incubator.apache.org
Sent: Friday, April 29, 2016 10:36 PM
Subject: Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest nightly build, or build master,
it should work.
As workaround, you can also limit the number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception I sent last night pls?
> When would the sample code to use KafkaIO be released?
> I really appreciate your valuable time. Below is the exception for your
> reference.
> This is how it gets used in my code:
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main" java.lang.IllegalStateException: no evaluator
> registered for Read(UnboundedKafkaSource)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by amir bahmanyari <am...@yahoo.com>.
My apologies Thomas.I mean Hi Thomas.Cheers
From: amir bahmanyari <am...@yahoo.com>
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Sent: Monday, May 2, 2016 10:52 AM
Subject: Re: KafkaIO Usage & Sample Code
Hi Dan,Sorry! I honestly dont know what "so long as you're on a commit after b2b77e380" means :)))If this means there is a specific jar file I need to have on my path, could you point me to a link where I can get the right jar file pls?I appreciate i sir.Amir
From: Thomas Groh <tg...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 10:02 AM
Subject: Re: KafkaIO Usage & Sample Code
Your calls should work - so long as you're on a commit after b2b77e380 (when we started implementing PipelineRunner), the InProcessPipelineRunner should be a valid argument to PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest (https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dh...@google.com> wrote:
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this is specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <am...@yahoo.com> wrote:
Thanks Dan.I actually had tried it before but got compilation errors at setting the InProcessPipelineRunner in the PipelineOptions object..I appreciate it if you point me to a working sample code.FYI, This is my implementation:import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:The method setRunner(Class<? extends PipelineRunner<?>>) in the type PipelineOptions is not applicable for the arguments (Class<InProcessPipelineRunner>). Thanks for your help.Amir-
From: Dan Halperin <dh...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 12:23 AM
Subject: Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <dh...@google.com> wrote:
Hi Amir,
As Frances suggested, you can use the InProcessPipelineRunner instead of the DirectPipelineRunner to execute your pipeline. (They're both in the codebase, it's just that the Direct runner is the default. Use the --runner command line option.)
Amending: it is relatively unlikely that the issues that we caught in testing would affect you. So it should be safe for your use case to do this -- and definitely safe to at least try it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com> wrote:
Thanks gentsWhat are our options in the meanwhile?Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner), and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default runner from Direct to InProcess. (#178)
However, testing unfortunately exposed some issues with the InProcess runner. (Actually, I should say "fortunately" because the tests caught it! Yay!) So we had to roll it back. (#198)
Once we improve the InProcess runner, we can re-do the default swap. After the swap, once the tests keep passing for a few days, we do indeed intend to delete the current Direct pipeline runner and replace it with the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support unbounded PCollections. You
can try using the InProcessPipelineRunner, which is the re-write of
local execution that provides support for unbounded PCollections and
better checking against the Beam Model. (We'll be renaming this to the
DirectPipelineRunner in the near future to avoid having both as soon as
the functionality of the InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see the thread is running, no
exceptions like above, waiting for incoming Kafka data, but the
method obtaining the data from processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into apply(TextIO.Write.to
<http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest nightly build, or build master,
it should work.
As workaround, you can also limit the number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception I sent last night pls?
> When would the sample code to use KafkaIO be released?
> I really appreciate your valuable time. Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main" java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org <ma...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com <http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Raghu Angadi <ra...@google.com>.
On Wed, May 4, 2016 at 12:57 PM, amir bahmanyari <am...@yahoo.com>
wrote:
> The root cause ended up to be the Kafka version in my lab.
> Kafka server must be version 9.0+ for the KafkaIO call in the Beam app
> code to populate the PCollection object .
>
Please let us know if there is something KafkaIO could do for you to have
caught this earlier. I think older Kafka versions (especially 0.8.x) are
pretty widely used and many unsuspecting users will be affected. I will
check if there is a good way to detect this early in Kafka consumer.
> I know Unbounded is the ultimate way to achieve a true real-time streaming.
> Given that Unbounded is not available at the moment, is there a work
> around that makes every single record available immediately to the app?
>
Can use use Google dataflow runner? It fully supports UnboundedSource in
KafkaIO.
Raghu.
Re: KafkaIO Usage & Sample Code
Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Raghu.yes thats the email. Not sure how magically evaporated from my inbox.I will try the fix & let you know.Am having some trouble running my Beam app in Flink Cluster (FlinkRunner)...I will post it in a different thread hope someone can help :-)Have a great weekend.
From: Raghu Angadi <ra...@google.com>
To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
Sent: Thursday, May 5, 2016 1:14 PM
Subject: Re: KafkaIO Usage & Sample Code
Were you looking for this? : https://mail-archives.apache.org/mod_mbox/beam-user/201605.mbox/%3CCAGwR7sBR9X7VoYuzJHymySb_5_tDUgQWUDz4Sdhop4eQu2GCEA@mail.gmail.com%3E
> Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for multi-threaded access".
btw, Thomas sent me full thread stack for this exception. I have a fix for it here https://github.com/apache/incubator-beam/pull/290 . It will be very helpful if you could check it works in your app.
Raghu.
On Thu, May 5, 2016 at 7:10 AM, amir bahmanyari <am...@yahoo.com> wrote:
Hi Raghu,I noticed you replied to this thread yesterday regarding users getting affected by this Kafka version difference etc.And Google DataFlow libs working fine for Unbound KafkaIO etc.I dont see that email in my inbox anymore. I might have accidentally deleted it.Could you resend it pls? I appreciate it...Have a great day.
From: amir bahmanyari <am...@yahoo.com>
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Sent: Wednesday, May 4, 2016 12:57 PM
Subject: Re: KafkaIO Usage & Sample Code
The root cause ended up to be the Kafka version in my lab.Kafka server must be version 9.0+ for the KafkaIO call in the Beam app code to populate the PCollection object .Thanks Thomas so much for diagnosing that.Appreciate all his valuable time he spent with me offline.
Status: Bounded:Works when setting either InProcessPipelineRunner or DirectPipelineRunner
Unbounded:Throws different exceptions for the different above runners. Thomas has the different stacktraces.
At the moment, I have withMaxNumRecords(100) set in my KafkaIO call.This causes the call to block till all the 100 records are received, and then makes it available to the app to consume it.I tried running the p.run() in a while(true) loop & setting withMaxNumRecords(100) so I get chunks of 100 records in multiple files created by TextIO.Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for multi-threaded access".
I know Unbounded is the ultimate way to achieve a true real-time streaming.Given that Unbounded is not available at the moment, is there a work around that makes every single record available immediately to the app?
Thanks everyone again for your valuable help.Amir- From: Thomas Groh <tg...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 11:41 AM
Subject: Re: KafkaIO Usage & Sample Code
Yeah, JB has it - if that commit (https://github.com/apache/incubator-beam/commit/b2b77e380) is in your history, the call should compile correctly; if it's not, then the InProcessPipelineRunner doesn't implement the appropriate interface, and that call won't typecheck (and lead to your compilation failure) - syncing to a more recent version should fix the problem.
On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
Thomas meant that you have to checkout after or at this commit:
git checkout b2b77e380
;)
Regards
JB
On 05/02/2016 07:52 PM, amir bahmanyari wrote:
Hi Dan,
Sorry! I honestly dont know what "so long as you're on a commit after
b2b77e380" means :)))
If this means there is a specific jar file I need to have on my path,
could you point me to a link where I can get the right jar file pls?
I appreciate i sir.
Amir
------------------------------------------------------------------------
*From:* Thomas Groh <tg...@google.com>
*To:* user@beam.incubator.apache.org
*Sent:* Monday, May 2, 2016 10:02 AM
*Subject:* Re: KafkaIO Usage & Sample Code
Your calls should work - so long as you're on a commit after b2b77e380
(when we started implementing PipelineRunner), the
InProcessPipelineRunner should be a valid argument to
PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest
(https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dhalperi@google.com
<ma...@google.com>> wrote:
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this
is specific to the Cloud Dataflow service and the
DataflowPipelineRunner. Try using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>> wrote:
Thanks Dan.
I actually had tried it before but got compilation errors at
setting the InProcessPipelineRunner in the PipelineOptions object..
I appreciate it if you point me to a working sample code.
FYI, This is my implementation:
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions =
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:
The method setRunner(Class<? extends PipelineRunner<?>>) in the
type PipelineOptions is not applicable for the arguments
(Class<InProcessPipelineRunner>).
Thanks for your help.
Amir-
------------------------------------------------------------------------
*From:* Dan Halperin <dhalperi@google.com
<ma...@google.com>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
*Sent:* Monday, May 2, 2016 12:23 AM
*Subject:* Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
<dhalperi@google.com <ma...@google.com>> wrote:
Hi Amir,
As Frances suggested, you can use the
InProcessPipelineRunner instead of the DirectPipelineRunner
to execute your pipeline. (They're both in the codebase,
it's just that the Direct runner is the default. Use the
--runner command line option.)
Amending: it is relatively unlikely that the issues that we
caught in testing would affect you. So it should be safe for
your use case to do this -- and definitely safe to at least try
it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
<amirtousa@yahoo.com <ma...@yahoo.com>> wrote:
Thanks gents
What are our options in the meanwhile?
Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin
<dhalperi@google.com <ma...@google.com>> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
<jb@nanthrax.net <ma...@nanthrax.net>> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner),
and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid
confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default
runner from Direct to InProcess. (#178
<https://github.com/apache/incubator-beam/pull/178>)
However, testing unfortunately exposed some issues
with the InProcess runner. (Actually, I should say
"fortunately" because the tests caught it! Yay!) So we
had to roll it back. (#198
<https://github.com/apache/incubator-beam/pull/198>)
Once we improve the InProcess runner, we can re-do the
default swap. After the swap, once the tests keep
passing for a few days, we do indeed intend to delete
the current Direct pipeline runner and replace it with
the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support
unbounded PCollections. You
can try using the InProcessPipelineRunner,
which is the re-write of
local execution that provides support for
unbounded PCollections and
better checking against the Beam Model. (We'll
be renaming this to the
DirectPipelineRunner in the near future to
avoid having both as soon as
the functionality of the
InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir
bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>
<mailto:amirtousa@yahoo.com
<ma...@yahoo.com>>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no
evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see
the thread is running, no
exceptions like above, waiting for
incoming Kafka data, but the
method obtaining the data from
processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into
apply(TextIO.Write.to <http://textio.write.to/>
<http://TextIO.Write.to
<http://textio.write.to/>>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP
address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré
<jb@nanthrax.net <ma...@nanthrax.net>
<mailto:jb@nanthrax.net <ma...@nanthrax.net>>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
<mailto:user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until
recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest
nightly build, or build master,
it should work.
As workaround, you can also limit the
number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this
users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception
I sent last night pls?
> When would the sample code to use
KafkaIO be released?
> I really appreciate your valuable time.
Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main"
java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
>
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org
<ma...@apache.org>
<mailto:jbonofre@apache.org
<ma...@apache.org>>
http://blog.nanthrax.net
<http://blog.nanthrax.net/>
<http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/> <http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org <ma...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Raghu Angadi <ra...@google.com>.
Were you looking for this? :
https://mail-archives.apache.org/mod_mbox/beam-user/201605.mbox/%3CCAGwR7sBR9X7VoYuzJHymySb_5_tDUgQWUDz4Sdhop4eQu2GCEA@mail.gmail.com%3E
> Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for
multi-threaded access".
btw, Thomas sent me full thread stack for this exception. I have a fix for
it here https://github.com/apache/incubator-beam/pull/290 . It will be very
helpful if you could check it works in your app.
Raghu.
On Thu, May 5, 2016 at 7:10 AM, amir bahmanyari <am...@yahoo.com> wrote:
> Hi Raghu,
> I noticed you replied to this thread yesterday regarding users getting
> affected by this Kafka version difference etc.
> And Google DataFlow libs working fine for Unbound KafkaIO etc.
> I dont see that email in my inbox anymore. I might have accidentally
> deleted it.
> Could you resend it pls? I appreciate it...
> Have a great day.
>
>
> ------------------------------
> *From:* amir bahmanyari <am...@yahoo.com>
> *To:* "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> *Sent:* Wednesday, May 4, 2016 12:57 PM
>
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> The root cause ended up to be the Kafka version in my lab.
> Kafka server must be version 9.0+ for the KafkaIO call in the Beam app
> code to populate the PCollection object .
> Thanks Thomas so much for diagnosing that.
> Appreciate all his valuable time he spent with me offline.
>
> Status:
> *Bounded*:
> *Works *when setting either InProcessPipelineRunner or
> DirectPipelineRunner
>
> *Unbounded*:
> *Throws *different exceptions for the different above runners.
> Thomas has the different stacktraces.
>
> At the moment, I have withMaxNumRecords(100) set in my KafkaIO call.
> This causes the call to block till all the 100 records are received, and
> then makes it available to the app to consume it.
> I tried running the p.run() in a while(true) loop &
> setting withMaxNumRecords(100) so I get chunks of 100 records in multiple
> files created by TextIO.
> Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for
> multi-threaded access".
>
> I know Unbounded is the ultimate way to achieve a true real-time streaming.
> Given that Unbounded is not available at the moment, is there a work
> around that makes every single record available immediately to the app?
>
> Thanks everyone again for your valuable help.
> Amir-
> ------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org
> *Sent:* Monday, May 2, 2016 11:41 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> Yeah, JB has it - if that commit (
> https://github.com/apache/incubator-beam/commit/b2b77e380) is in your
> history, the call should compile correctly; if it's not, then the
> InProcessPipelineRunner doesn't implement the appropriate interface, and
> that call won't typecheck (and lead to your compilation failure) - syncing
> to a more recent version should fix the problem.
>
> On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> Thomas meant that you have to checkout after or at this commit:
>
> git checkout b2b77e380
>
> ;)
>
> Regards
> JB
>
> On 05/02/2016 07:52 PM, amir bahmanyari wrote:
>
> Hi Dan,
> Sorry! I honestly dont know what "so long as you're on a commit after
> b2b77e380" means :)))
> If this means there is a specific jar file I need to have on my path,
> could you point me to a link where I can get the right jar file pls?
> I appreciate i sir.
> Amir
>
>
> ------------------------------------------------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org
> *Sent:* Monday, May 2, 2016 10:02 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> Your calls should work - so long as you're on a commit after b2b77e380
> (when we started implementing PipelineRunner), the
> InProcessPipelineRunner should be a valid argument to
> PipelineOptions#setRunner
>
> As an example, there's the InProcessPipelineRunnerTest
> (
> https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73
> )
>
> On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dhalperi@google.com
> <ma...@google.com>> wrote:
>
> Hi Amir,
>
> The problem is likely in using DataflowPipelineOptions.class -- this
> is specific to the Cloud Dataflow service and the
> DataflowPipelineRunner. Try using just "PipelineOptions".
>
> Dan
>
> On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
> Thanks Dan.
> I actually had tried it before but got compilation errors at
> setting the InProcessPipelineRunner in the PipelineOptions
> object..
> I appreciate it if you point me to a working sample code.
> FYI, This is my implementation:
> import com.google.cloud.dataflow.sdk.options.PipelineOptions;
> import
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
> DataflowPipelineOptions Myoptions =
> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
> Myoptions.setRunner(InProcessPipelineRunner.class);
>
> I cannot set runner as InProcessPipelineRunner in the last line:
> The method setRunner(Class<? extends PipelineRunner<?>>) in the
> type PipelineOptions is not applicable for the arguments
> (Class<InProcessPipelineRunner>).
> Thanks for your help.
> Amir-
>
>
>
> ------------------------------------------------------------------------
> *From:* Dan Halperin <dhalperi@google.com
> <ma...@google.com>>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> *Sent:* Monday, May 2, 2016 12:23 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
> <dhalperi@google.com <ma...@google.com>> wrote:
>
> Hi Amir,
>
> As Frances suggested, you can use the
> InProcessPipelineRunner instead of the DirectPipelineRunner
> to execute your pipeline. (They're both in the codebase,
> it's just that the Direct runner is the default. Use the
> --runner command line option.)
>
>
> Amending: it is relatively unlikely that the issues that we
> caught in testing would affect you. So it should be safe for
> your use case to do this -- and definitely safe to at least try
> it out!
>
>
> Dan
>
> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
> <amirtousa@yahoo.com <ma...@yahoo.com>> wrote:
>
> Thanks gents
> What are our options in the meanwhile?
> Cheers
>
> Sent from my iPhone
>
> On May 2, 2016, at 12:00 AM, Dan Halperin
> <dhalperi@google.com <ma...@google.com>> wrote:
>
> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
> <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>
> Oh, thanks Frances.
>
> I mixed DirectPipelineRunner ("old" local runner),
> and InProcessPipelineRunner ("new" local runner) ;)
>
> We should remove the DirectPipelineRunner to avoid
> confusion. WDYT ?
>
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default
> runner from Direct to InProcess. (#178
> <https://github.com/apache/incubator-beam/pull/178>)
>
> However, testing unfortunately exposed some issues
> with the InProcess runner. (Actually, I should say
> "fortunately" because the tests caught it! Yay!) So we
> had to roll it back. (#198
> <https://github.com/apache/incubator-beam/pull/198>)
>
>
> Once we improve the InProcess runner, we can re-do the
> default swap. After the swap, once the tests keep
> passing for a few days, we do indeed intend to delete
> the current Direct pipeline runner and replace it with
> the current InProcess runner.
>
> Dan
>
>
> Regards
> JB
>
> On 05/02/2016 03:12 AM, Frances Perry wrote:
>
> +Thomas, author of the InProcessPipelineRunner
>
> The DirectPipelineRunner doesn't yet support
> unbounded PCollections. You
> can try using the InProcessPipelineRunner,
> which is the re-write of
> local execution that provides support for
> unbounded PCollections and
> better checking against the Beam Model. (We'll
> be renaming this to the
> DirectPipelineRunner in the near future to
> avoid having both as soon as
> the functionality of the
> InProcessPipelineRunner is complete.)
>
> On Sun, May 1, 2016 at 4:38 PM, amir
> bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>
> <mailto:amirtousa@yahoo.com
>
> <ma...@yahoo.com>>> wrote:
>
> Hi JB,
> I rebuilt my code with the latest :
> kafka-0.1.0-incubating-20160501.070733-11.jar
>
> <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
> >
>
>
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>
> <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> >
>
>
> Tried _without setting withMaxNumRecords()_:
> Throws java.lang.IllegalStateException: no
> evaluator registered for
> Read(UnboundedKafkaSource)
> at
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
> _With setting ithMaxNumRecords(_), I see
> the thread is running, no
> exceptions like above, waiting for
> incoming Kafka data, but the
> method obtaining the data from
> processElement(ProcessContext ctx)
> never executes.
> Therefore, nothing goes into
> apply(TextIO.Write.to <http://textio.write.to/> <
> http://textio.write.to/>
> <http://TextIO.Write.to
> <http://textio.write.to/>
> <http://textio.write.to/
> >>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
> I see Kafka Broker reports my laptop IP
> address as getting a
> connection to it, OK.
> Everything looks OK at the server side.
> Doesn't look like its my lucky day.
> I appreciate any help/feedback/suggetion.
> Cheers
>
>
>
> ------------------------------------------------------------------------
> *From:* Jean-Baptiste Onofré
> <jb@nanthrax.net <ma...@nanthrax.net>
> <mailto:jb@nanthrax.net <ma...@nanthrax.net>>>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> <mailto:user@beam.incubator.apache.org
>
> <ma...@beam.incubator.apache.org>>
> *Sent:* Friday, April 29, 2016 10:36 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
>
> As I said in my previous e-mail, until
> recently DirectPipelineRunner
> didn't support Unbounded.
>
> It's now fixed, so if you take a latest
> nightly build, or build master,
> it should work.
>
> As workaround, you can also limit the
> number of message consumed from
> Kafka (and so work with bounded).
>
> Regards
> JB
>
> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> > Hi colleagues,
> > I am moving this conversation to this
> users mailing list as per Max’s
> > suggestion.
> > Thanks Max.
> > Hi JB,
> > Hope all is great.
> > Is there a resolution to the exception
> I sent last night pls?
> > When would the sample code to use
> KafkaIO be released?
> > I really appreciate your valuable time.
> Below is the exception
> for your
> > reference.
> > This is how it gets used in my code:
> >
> >
>
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
> >
> > Have a wonderful weekend.
> > Exception in thread "main"
> java.lang.IllegalStateException: no
> evaluator
> > registered for Read(UnboundedKafkaSource)
> > at
> >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> > at
> >
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> > at
> >
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> > at
> >
>
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> > at
> >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> > at
> >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> > at
> >
>
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> > at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > at
> >
>
>
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> > Kind Regards,
> > Amir
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> <ma...@apache.org>
> <mailto:jbonofre@apache.org
> <ma...@apache.org>>
> http://blog.nanthrax.net
> <http://blog.nanthrax.net/>
> <http://blog.nanthrax.net/>
> Talend - http://www.talend.com
> <http://www.talend.com/> <http://www.talend.com/>
>
>
>
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com
> <http://www.talend.com/>
>
>
>
>
>
>
>
>
>
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>
Re: KafkaIO Usage & Sample Code
Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,I noticed you replied to this thread yesterday regarding users getting affected by this Kafka version difference etc.And Google DataFlow libs working fine for Unbound KafkaIO etc.I dont see that email in my inbox anymore. I might have accidentally deleted it.Could you resend it pls? I appreciate it...Have a great day.
From: amir bahmanyari <am...@yahoo.com>
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Sent: Wednesday, May 4, 2016 12:57 PM
Subject: Re: KafkaIO Usage & Sample Code
The root cause ended up to be the Kafka version in my lab.Kafka server must be version 9.0+ for the KafkaIO call in the Beam app code to populate the PCollection object .Thanks Thomas so much for diagnosing that.Appreciate all his valuable time he spent with me offline.
Status: Bounded:Works when setting either InProcessPipelineRunner or DirectPipelineRunner
Unbounded:Throws different exceptions for the different above runners. Thomas has the different stacktraces.
At the moment, I have withMaxNumRecords(100) set in my KafkaIO call.This causes the call to block till all the 100 records are received, and then makes it available to the app to consume it.I tried running the p.run() in a while(true) loop & setting withMaxNumRecords(100) so I get chunks of 100 records in multiple files created by TextIO.Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for multi-threaded access".
I know Unbounded is the ultimate way to achieve a true real-time streaming.Given that Unbounded is not available at the moment, is there a work around that makes every single record available immediately to the app?
Thanks everyone again for your valuable help.Amir- From: Thomas Groh <tg...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 11:41 AM
Subject: Re: KafkaIO Usage & Sample Code
Yeah, JB has it - if that commit (https://github.com/apache/incubator-beam/commit/b2b77e380) is in your history, the call should compile correctly; if it's not, then the InProcessPipelineRunner doesn't implement the appropriate interface, and that call won't typecheck (and lead to your compilation failure) - syncing to a more recent version should fix the problem.
On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
Thomas meant that you have to checkout after or at this commit:
git checkout b2b77e380
;)
Regards
JB
On 05/02/2016 07:52 PM, amir bahmanyari wrote:
Hi Dan,
Sorry! I honestly dont know what "so long as you're on a commit after
b2b77e380" means :)))
If this means there is a specific jar file I need to have on my path,
could you point me to a link where I can get the right jar file pls?
I appreciate i sir.
Amir
------------------------------------------------------------------------
*From:* Thomas Groh <tg...@google.com>
*To:* user@beam.incubator.apache.org
*Sent:* Monday, May 2, 2016 10:02 AM
*Subject:* Re: KafkaIO Usage & Sample Code
Your calls should work - so long as you're on a commit after b2b77e380
(when we started implementing PipelineRunner), the
InProcessPipelineRunner should be a valid argument to
PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest
(https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dhalperi@google.com
<ma...@google.com>> wrote:
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this
is specific to the Cloud Dataflow service and the
DataflowPipelineRunner. Try using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>> wrote:
Thanks Dan.
I actually had tried it before but got compilation errors at
setting the InProcessPipelineRunner in the PipelineOptions object..
I appreciate it if you point me to a working sample code.
FYI, This is my implementation:
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions =
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:
The method setRunner(Class<? extends PipelineRunner<?>>) in the
type PipelineOptions is not applicable for the arguments
(Class<InProcessPipelineRunner>).
Thanks for your help.
Amir-
------------------------------------------------------------------------
*From:* Dan Halperin <dhalperi@google.com
<ma...@google.com>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
*Sent:* Monday, May 2, 2016 12:23 AM
*Subject:* Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
<dhalperi@google.com <ma...@google.com>> wrote:
Hi Amir,
As Frances suggested, you can use the
InProcessPipelineRunner instead of the DirectPipelineRunner
to execute your pipeline. (They're both in the codebase,
it's just that the Direct runner is the default. Use the
--runner command line option.)
Amending: it is relatively unlikely that the issues that we
caught in testing would affect you. So it should be safe for
your use case to do this -- and definitely safe to at least try
it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
<amirtousa@yahoo.com <ma...@yahoo.com>> wrote:
Thanks gents
What are our options in the meanwhile?
Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin
<dhalperi@google.com <ma...@google.com>> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
<jb@nanthrax.net <ma...@nanthrax.net>> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner),
and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid
confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default
runner from Direct to InProcess. (#178
<https://github.com/apache/incubator-beam/pull/178>)
However, testing unfortunately exposed some issues
with the InProcess runner. (Actually, I should say
"fortunately" because the tests caught it! Yay!) So we
had to roll it back. (#198
<https://github.com/apache/incubator-beam/pull/198>)
Once we improve the InProcess runner, we can re-do the
default swap. After the swap, once the tests keep
passing for a few days, we do indeed intend to delete
the current Direct pipeline runner and replace it with
the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support
unbounded PCollections. You
can try using the InProcessPipelineRunner,
which is the re-write of
local execution that provides support for
unbounded PCollections and
better checking against the Beam Model. (We'll
be renaming this to the
DirectPipelineRunner in the near future to
avoid having both as soon as
the functionality of the
InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir
bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>
<mailto:amirtousa@yahoo.com
<ma...@yahoo.com>>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no
evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see
the thread is running, no
exceptions like above, waiting for
incoming Kafka data, but the
method obtaining the data from
processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into
apply(TextIO.Write.to <http://textio.write.to/>
<http://TextIO.Write.to
<http://textio.write.to/>>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP
address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré
<jb@nanthrax.net <ma...@nanthrax.net>
<mailto:jb@nanthrax.net <ma...@nanthrax.net>>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
<mailto:user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until
recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest
nightly build, or build master,
it should work.
As workaround, you can also limit the
number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this
users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception
I sent last night pls?
> When would the sample code to use
KafkaIO be released?
> I really appreciate your valuable time.
Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main"
java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
>
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org
<ma...@apache.org>
<mailto:jbonofre@apache.org
<ma...@apache.org>>
http://blog.nanthrax.net
<http://blog.nanthrax.net/>
<http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/> <http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org <ma...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by amir bahmanyari <am...@yahoo.com>.
The root cause ended up to be the Kafka version in my lab.Kafka server must be version 9.0+ for the KafkaIO call in the Beam app code to populate the PCollection object .Thanks Thomas so much for diagnosing that.Appreciate all his valuable time he spent with me offline.
Status: Bounded:Works when setting either InProcessPipelineRunner or DirectPipelineRunner
Unbounded:Throws different exceptions for the different above runners. Thomas has the different stacktraces.
At the moment, I have withMaxNumRecords(100) set in my KafkaIO call.This causes the call to block till all the 100 records are received, and then makes it available to the app to consume it.I tried running the p.run() in a while(true) loop & setting withMaxNumRecords(100) so I get chunks of 100 records in multiple files created by TextIO.Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for multi-threaded access".
I know Unbounded is the ultimate way to achieve a true real-time streaming.Given that Unbounded is not available at the moment, is there a work around that makes every single record available immediately to the app?
Thanks everyone again for your valuable help.Amir- From: Thomas Groh <tg...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 11:41 AM
Subject: Re: KafkaIO Usage & Sample Code
Yeah, JB has it - if that commit (https://github.com/apache/incubator-beam/commit/b2b77e380) is in your history, the call should compile correctly; if it's not, then the InProcessPipelineRunner doesn't implement the appropriate interface, and that call won't typecheck (and lead to your compilation failure) - syncing to a more recent version should fix the problem.
On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
Thomas meant that you have to checkout after or at this commit:
git checkout b2b77e380
;)
Regards
JB
On 05/02/2016 07:52 PM, amir bahmanyari wrote:
Hi Dan,
Sorry! I honestly dont know what "so long as you're on a commit after
b2b77e380" means :)))
If this means there is a specific jar file I need to have on my path,
could you point me to a link where I can get the right jar file pls?
I appreciate i sir.
Amir
------------------------------------------------------------------------
*From:* Thomas Groh <tg...@google.com>
*To:* user@beam.incubator.apache.org
*Sent:* Monday, May 2, 2016 10:02 AM
*Subject:* Re: KafkaIO Usage & Sample Code
Your calls should work - so long as you're on a commit after b2b77e380
(when we started implementing PipelineRunner), the
InProcessPipelineRunner should be a valid argument to
PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest
(https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dhalperi@google.com
<ma...@google.com>> wrote:
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this
is specific to the Cloud Dataflow service and the
DataflowPipelineRunner. Try using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>> wrote:
Thanks Dan.
I actually had tried it before but got compilation errors at
setting the InProcessPipelineRunner in the PipelineOptions object..
I appreciate it if you point me to a working sample code.
FYI, This is my implementation:
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions =
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:
The method setRunner(Class<? extends PipelineRunner<?>>) in the
type PipelineOptions is not applicable for the arguments
(Class<InProcessPipelineRunner>).
Thanks for your help.
Amir-
------------------------------------------------------------------------
*From:* Dan Halperin <dhalperi@google.com
<ma...@google.com>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
*Sent:* Monday, May 2, 2016 12:23 AM
*Subject:* Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
<dhalperi@google.com <ma...@google.com>> wrote:
Hi Amir,
As Frances suggested, you can use the
InProcessPipelineRunner instead of the DirectPipelineRunner
to execute your pipeline. (They're both in the codebase,
it's just that the Direct runner is the default. Use the
--runner command line option.)
Amending: it is relatively unlikely that the issues that we
caught in testing would affect you. So it should be safe for
your use case to do this -- and definitely safe to at least try
it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
<amirtousa@yahoo.com <ma...@yahoo.com>> wrote:
Thanks gents
What are our options in the meanwhile?
Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin
<dhalperi@google.com <ma...@google.com>> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
<jb@nanthrax.net <ma...@nanthrax.net>> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner),
and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid
confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default
runner from Direct to InProcess. (#178
<https://github.com/apache/incubator-beam/pull/178>)
However, testing unfortunately exposed some issues
with the InProcess runner. (Actually, I should say
"fortunately" because the tests caught it! Yay!) So we
had to roll it back. (#198
<https://github.com/apache/incubator-beam/pull/198>)
Once we improve the InProcess runner, we can re-do the
default swap. After the swap, once the tests keep
passing for a few days, we do indeed intend to delete
the current Direct pipeline runner and replace it with
the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support
unbounded PCollections. You
can try using the InProcessPipelineRunner,
which is the re-write of
local execution that provides support for
unbounded PCollections and
better checking against the Beam Model. (We'll
be renaming this to the
DirectPipelineRunner in the near future to
avoid having both as soon as
the functionality of the
InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir
bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>
<mailto:amirtousa@yahoo.com
<ma...@yahoo.com>>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no
evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see
the thread is running, no
exceptions like above, waiting for
incoming Kafka data, but the
method obtaining the data from
processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into
apply(TextIO.Write.to <http://textio.write.to/>
<http://TextIO.Write.to
<http://textio.write.to/>>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP
address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré
<jb@nanthrax.net <ma...@nanthrax.net>
<mailto:jb@nanthrax.net <ma...@nanthrax.net>>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
<mailto:user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until
recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest
nightly build, or build master,
it should work.
As workaround, you can also limit the
number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this
users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception
I sent last night pls?
> When would the sample code to use
KafkaIO be released?
> I really appreciate your valuable time.
Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main"
java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
>
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org
<ma...@apache.org>
<mailto:jbonofre@apache.org
<ma...@apache.org>>
http://blog.nanthrax.net
<http://blog.nanthrax.net/>
<http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/> <http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org <ma...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Thomas Groh <tg...@google.com>.
Yeah, JB has it - if that commit (
https://github.com/apache/incubator-beam/commit/b2b77e380) is in your
history, the call should compile correctly; if it's not, then the
InProcessPipelineRunner doesn't implement the appropriate interface, and
that call won't typecheck (and lead to your compilation failure) - syncing
to a more recent version should fix the problem.
On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:
> Thomas meant that you have to checkout after or at this commit:
>
> git checkout b2b77e380
>
> ;)
>
> Regards
> JB
>
> On 05/02/2016 07:52 PM, amir bahmanyari wrote:
>
>> Hi Dan,
>> Sorry! I honestly dont know what "so long as you're on a commit after
>> b2b77e380" means :)))
>> If this means there is a specific jar file I need to have on my path,
>> could you point me to a link where I can get the right jar file pls?
>> I appreciate i sir.
>> Amir
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Thomas Groh <tg...@google.com>
>> *To:* user@beam.incubator.apache.org
>> *Sent:* Monday, May 2, 2016 10:02 AM
>> *Subject:* Re: KafkaIO Usage & Sample Code
>>
>> Your calls should work - so long as you're on a commit after b2b77e380
>> (when we started implementing PipelineRunner), the
>> InProcessPipelineRunner should be a valid argument to
>> PipelineOptions#setRunner
>>
>> As an example, there's the InProcessPipelineRunnerTest
>> (
>> https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73
>> )
>>
>> On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dhalperi@google.com
>> <ma...@google.com>> wrote:
>>
>> Hi Amir,
>>
>> The problem is likely in using DataflowPipelineOptions.class -- this
>> is specific to the Cloud Dataflow service and the
>> DataflowPipelineRunner. Try using just "PipelineOptions".
>>
>> Dan
>>
>> On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <amirtousa@yahoo.com
>> <ma...@yahoo.com>> wrote:
>>
>> Thanks Dan.
>> I actually had tried it before but got compilation errors at
>> setting the InProcessPipelineRunner in the PipelineOptions
>> object..
>> I appreciate it if you point me to a working sample code.
>> FYI, This is my implementation:
>> import com.google.cloud.dataflow.sdk.options.PipelineOptions;
>> import
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
>> DataflowPipelineOptions Myoptions =
>> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
>> Myoptions.setRunner(InProcessPipelineRunner.class);
>>
>> I cannot set runner as InProcessPipelineRunner in the last line:
>> The method setRunner(Class<? extends PipelineRunner<?>>) in the
>> type PipelineOptions is not applicable for the arguments
>> (Class<InProcessPipelineRunner>).
>> Thanks for your help.
>> Amir-
>>
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Dan Halperin <dhalperi@google.com
>> <ma...@google.com>>
>> *To:* user@beam.incubator.apache.org
>> <ma...@beam.incubator.apache.org>
>> *Sent:* Monday, May 2, 2016 12:23 AM
>> *Subject:* Re: KafkaIO Usage & Sample Code
>>
>> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
>> <dhalperi@google.com <ma...@google.com>> wrote:
>>
>> Hi Amir,
>>
>> As Frances suggested, you can use the
>> InProcessPipelineRunner instead of the DirectPipelineRunner
>> to execute your pipeline. (They're both in the codebase,
>> it's just that the Direct runner is the default. Use the
>> --runner command line option.)
>>
>>
>> Amending: it is relatively unlikely that the issues that we
>> caught in testing would affect you. So it should be safe for
>> your use case to do this -- and definitely safe to at least try
>> it out!
>>
>>
>> Dan
>>
>> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
>> <amirtousa@yahoo.com <ma...@yahoo.com>> wrote:
>>
>> Thanks gents
>> What are our options in the meanwhile?
>> Cheers
>>
>> Sent from my iPhone
>>
>> On May 2, 2016, at 12:00 AM, Dan Halperin
>> <dhalperi@google.com <ma...@google.com>> wrote:
>>
>> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
>>> <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>
>>> Oh, thanks Frances.
>>>
>>> I mixed DirectPipelineRunner ("old" local runner),
>>> and InProcessPipelineRunner ("new" local runner) ;)
>>>
>>> We should remove the DirectPipelineRunner to avoid
>>> confusion. WDYT ?
>>>
>>>
>>> We would like to do this soon, but there are some snags.
>>>
>>> As a preparation step, Thomas swapped the default
>>> runner from Direct to InProcess. (#178
>>> <https://github.com/apache/incubator-beam/pull/178>)
>>>
>>> However, testing unfortunately exposed some issues
>>> with the InProcess runner. (Actually, I should say
>>> "fortunately" because the tests caught it! Yay!) So we
>>> had to roll it back. (#198
>>> <https://github.com/apache/incubator-beam/pull/198>)
>>>
>>>
>>> Once we improve the InProcess runner, we can re-do the
>>> default swap. After the swap, once the tests keep
>>> passing for a few days, we do indeed intend to delete
>>> the current Direct pipeline runner and replace it with
>>> the current InProcess runner.
>>>
>>> Dan
>>>
>>>
>>> Regards
>>> JB
>>>
>>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>>
>>> +Thomas, author of the InProcessPipelineRunner
>>>
>>> The DirectPipelineRunner doesn't yet support
>>> unbounded PCollections. You
>>> can try using the InProcessPipelineRunner,
>>> which is the re-write of
>>> local execution that provides support for
>>> unbounded PCollections and
>>> better checking against the Beam Model. (We'll
>>> be renaming this to the
>>> DirectPipelineRunner in the near future to
>>> avoid having both as soon as
>>> the functionality of the
>>> InProcessPipelineRunner is complete.)
>>>
>>> On Sun, May 1, 2016 at 4:38 PM, amir
>>> bahmanyari <amirtousa@yahoo.com
>>> <ma...@yahoo.com>
>>> <mailto:amirtousa@yahoo.com
>>>
>>> <ma...@yahoo.com>>> wrote:
>>>
>>> Hi JB,
>>> I rebuilt my code with the latest :
>>> kafka-0.1.0-incubating-20160501.070733-11.jar
>>>
>>> <
>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
>>> >
>>>
>>>
>>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>>
>>> <
>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>> >
>>>
>>>
>>> Tried _without setting withMaxNumRecords()_:
>>> Throws java.lang.IllegalStateException: no
>>> evaluator registered for
>>> Read(UnboundedKafkaSource)
>>> at
>>>
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> at
>>>
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> at
>>>
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>
>>> _With setting ithMaxNumRecords(_), I see
>>> the thread is running, no
>>> exceptions like above, waiting for
>>> incoming Kafka data, but the
>>> method obtaining the data from
>>> processElement(ProcessContext ctx)
>>> never executes.
>>> Therefore, nothing goes into
>>> apply(TextIO.Write.to <http://textio.write.to/>
>>> <http://TextIO.Write.to
>>> <http://textio.write.to/
>>> >>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>>
>>> I see Kafka Broker reports my laptop IP
>>> address as getting a
>>> connection to it, OK.
>>> Everything looks OK at the server side.
>>> Doesn't look like its my lucky day.
>>> I appreciate any help/feedback/suggetion.
>>> Cheers
>>>
>>>
>>>
>>> ------------------------------------------------------------------------
>>> *From:* Jean-Baptiste Onofré
>>> <jb@nanthrax.net <ma...@nanthrax.net>
>>> <mailto:jb@nanthrax.net <mailto:jb@nanthrax.net
>>> >>>
>>> *To:* user@beam.incubator.apache.org
>>> <ma...@beam.incubator.apache.org>
>>> <mailto:user@beam.incubator.apache.org
>>>
>>> <ma...@beam.incubator.apache.org>>
>>> *Sent:* Friday, April 29, 2016 10:36 PM
>>> *Subject:* Re: KafkaIO Usage & Sample Code
>>>
>>>
>>> As I said in my previous e-mail, until
>>> recently DirectPipelineRunner
>>> didn't support Unbounded.
>>>
>>> It's now fixed, so if you take a latest
>>> nightly build, or build master,
>>> it should work.
>>>
>>> As workaround, you can also limit the
>>> number of message consumed from
>>> Kafka (and so work with bounded).
>>>
>>> Regards
>>> JB
>>>
>>> On 04/29/2016 07:12 PM, amir bahmanyari
>>> wrote:
>>> > Hi colleagues,
>>> > I am moving this conversation to this
>>> users mailing list as per Max’s
>>> > suggestion.
>>> > Thanks Max.
>>> > Hi JB,
>>> > Hope all is great.
>>> > Is there a resolution to the exception
>>> I sent last night pls?
>>> > When would the sample code to use
>>> KafkaIO be released?
>>> > I really appreciate your valuable time.
>>> Below is the exception
>>> for your
>>> > reference.
>>> > This is how it gets used in my code:
>>> >
>>> >
>>>
>>>
>>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>>> >
>>> > Have a wonderful weekend.
>>> > Exception in thread "main"
>>> java.lang.IllegalStateException: no
>>> evaluator
>>> > registered for Read(UnboundedKafkaSource)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>> > at
>>> >
>>>
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>> > at
>>>
>>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>>> > at
>>> >
>>>
>>>
>>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>> > Kind Regards,
>>> > Amir
>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> <ma...@apache.org>
>>> <mailto:jbonofre@apache.org
>>> <ma...@apache.org>>
>>> http://blog.nanthrax.net
>>> <http://blog.nanthrax.net/>
>>> <http://blog.nanthrax.net/>
>>> Talend - http://www.talend.com
>>> <http://www.talend.com/> <http://www.talend.com/
>>> >
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org <ma...@apache.org>
>>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>> Talend - http://www.talend.com
>>> <http://www.talend.com/>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
Re: KafkaIO Usage & Sample Code
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thomas meant that you have to checkout after or at this commit:
git checkout b2b77e380
;)
Regards
JB
On 05/02/2016 07:52 PM, amir bahmanyari wrote:
> Hi Dan,
> Sorry! I honestly dont know what "so long as you're on a commit after
> b2b77e380" means :)))
> If this means there is a specific jar file I need to have on my path,
> could you point me to a link where I can get the right jar file pls?
> I appreciate i sir.
> Amir
>
>
> ------------------------------------------------------------------------
> *From:* Thomas Groh <tg...@google.com>
> *To:* user@beam.incubator.apache.org
> *Sent:* Monday, May 2, 2016 10:02 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> Your calls should work - so long as you're on a commit after b2b77e380
> (when we started implementing PipelineRunner), the
> InProcessPipelineRunner should be a valid argument to
> PipelineOptions#setRunner
>
> As an example, there's the InProcessPipelineRunnerTest
> (https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
>
> On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dhalperi@google.com
> <ma...@google.com>> wrote:
>
> Hi Amir,
>
> The problem is likely in using DataflowPipelineOptions.class -- this
> is specific to the Cloud Dataflow service and the
> DataflowPipelineRunner. Try using just "PipelineOptions".
>
> Dan
>
> On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
> Thanks Dan.
> I actually had tried it before but got compilation errors at
> setting the InProcessPipelineRunner in the PipelineOptions object..
> I appreciate it if you point me to a working sample code.
> FYI, This is my implementation:
> import com.google.cloud.dataflow.sdk.options.PipelineOptions;
> import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
> DataflowPipelineOptions Myoptions =
> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
> Myoptions.setRunner(InProcessPipelineRunner.class);
>
> I cannot set runner as InProcessPipelineRunner in the last line:
> The method setRunner(Class<? extends PipelineRunner<?>>) in the
> type PipelineOptions is not applicable for the arguments
> (Class<InProcessPipelineRunner>).
> Thanks for your help.
> Amir-
>
>
> ------------------------------------------------------------------------
> *From:* Dan Halperin <dhalperi@google.com
> <ma...@google.com>>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> *Sent:* Monday, May 2, 2016 12:23 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
> <dhalperi@google.com <ma...@google.com>> wrote:
>
> Hi Amir,
>
> As Frances suggested, you can use the
> InProcessPipelineRunner instead of the DirectPipelineRunner
> to execute your pipeline. (They're both in the codebase,
> it's just that the Direct runner is the default. Use the
> --runner command line option.)
>
>
> Amending: it is relatively unlikely that the issues that we
> caught in testing would affect you. So it should be safe for
> your use case to do this -- and definitely safe to at least try
> it out!
>
>
> Dan
>
> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
> <amirtousa@yahoo.com <ma...@yahoo.com>> wrote:
>
> Thanks gents
> What are our options in the meanwhile?
> Cheers
>
> Sent from my iPhone
>
> On May 2, 2016, at 12:00 AM, Dan Halperin
> <dhalperi@google.com <ma...@google.com>> wrote:
>
>> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofr�
>> <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>
>> Oh, thanks Frances.
>>
>> I mixed DirectPipelineRunner ("old" local runner),
>> and InProcessPipelineRunner ("new" local runner) ;)
>>
>> We should remove the DirectPipelineRunner to avoid
>> confusion. WDYT ?
>>
>>
>> We would like to do this soon, but there are some snags.
>>
>> As a preparation step, Thomas swapped the default
>> runner from Direct to InProcess. (#178
>> <https://github.com/apache/incubator-beam/pull/178>)
>>
>> However, testing unfortunately exposed some issues
>> with the InProcess runner. (Actually, I should say
>> "fortunately" because the tests caught it! Yay!) So we
>> had to roll it back. (#198
>> <https://github.com/apache/incubator-beam/pull/198>)
>>
>> Once we improve the InProcess runner, we can re-do the
>> default swap. After the swap, once the tests keep
>> passing for a few days, we do indeed intend to delete
>> the current Direct pipeline runner and replace it with
>> the current InProcess runner.
>>
>> Dan
>>
>>
>> Regards
>> JB
>>
>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>
>> +Thomas, author of the InProcessPipelineRunner
>>
>> The DirectPipelineRunner doesn't yet support
>> unbounded PCollections. You
>> can try using the InProcessPipelineRunner,
>> which is the re-write of
>> local execution that provides support for
>> unbounded PCollections and
>> better checking against the Beam Model. (We'll
>> be renaming this to the
>> DirectPipelineRunner in the near future to
>> avoid having both as soon as
>> the functionality of the
>> InProcessPipelineRunner is complete.)
>>
>> On Sun, May 1, 2016 at 4:38 PM, amir
>> bahmanyari <amirtousa@yahoo.com
>> <ma...@yahoo.com>
>> <mailto:amirtousa@yahoo.com
>> <ma...@yahoo.com>>> wrote:
>>
>> Hi JB,
>> I rebuilt my code with the latest :
>> kafka-0.1.0-incubating-20160501.070733-11.jar
>>
>> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
>>
>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>
>> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
>>
>>
>> Tried _without setting withMaxNumRecords()_:
>> Throws java.lang.IllegalStateException: no
>> evaluator registered for
>> Read(UnboundedKafkaSource)
>> at
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> at
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> at
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>
>> _With setting ithMaxNumRecords(_), I see
>> the thread is running, no
>> exceptions like above, waiting for
>> incoming Kafka data, but the
>> method obtaining the data from
>> processElement(ProcessContext ctx)
>> never executes.
>> Therefore, nothing goes into
>> apply(TextIO.Write.to <http://textio.write.to/>
>> <http://TextIO.Write.to
>> <http://textio.write.to/>>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>
>> I see Kafka Broker reports my laptop IP
>> address as getting a
>> connection to it, OK.
>> Everything looks OK at the server side.
>> Doesn't look like its my lucky day.
>> I appreciate any help/feedback/suggetion.
>> Cheers
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Jean-Baptiste Onofr�
>> <jb@nanthrax.net <ma...@nanthrax.net>
>> <mailto:jb@nanthrax.net <ma...@nanthrax.net>>>
>> *To:* user@beam.incubator.apache.org
>> <ma...@beam.incubator.apache.org>
>> <mailto:user@beam.incubator.apache.org
>> <ma...@beam.incubator.apache.org>>
>> *Sent:* Friday, April 29, 2016 10:36 PM
>> *Subject:* Re: KafkaIO Usage & Sample Code
>>
>>
>> As I said in my previous e-mail, until
>> recently DirectPipelineRunner
>> didn't support Unbounded.
>>
>> It's now fixed, so if you take a latest
>> nightly build, or build master,
>> it should work.
>>
>> As workaround, you can also limit the
>> number of message consumed from
>> Kafka (and so work with bounded).
>>
>> Regards
>> JB
>>
>> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>> > Hi colleagues,
>> > I am moving this conversation to this
>> users mailing list as per Max\u2019s
>> > suggestion.
>> > Thanks Max.
>> > Hi JB,
>> > Hope all is great.
>> > Is there a resolution to the exception
>> I sent last night pls?
>> > When would the sample code to use
>> KafkaIO be released?
>> > I really appreciate your valuable time.
>> Below is the exception
>> for your
>> > reference.
>> > This is how it gets used in my code:
>> >
>> >
>>
>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>> >
>> > Have a wonderful weekend.
>> > Exception in thread "main"
>> java.lang.IllegalStateException: no
>> evaluator
>> > registered for Read(UnboundedKafkaSource)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>> > at
>> >
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>> > at
>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>> > at
>> >
>>
>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>> > Kind Regards,
>> > Amir
>>
>>
>> --
>> Jean-Baptiste Onofr�
>> jbonofre@apache.org
>> <ma...@apache.org>
>> <mailto:jbonofre@apache.org
>> <ma...@apache.org>>
>> http://blog.nanthrax.net
>> <http://blog.nanthrax.net/>
>> <http://blog.nanthrax.net/>
>> Talend - http://www.talend.com
>> <http://www.talend.com/> <http://www.talend.com/>
>>
>>
>>
>>
>>
>> --
>> Jean-Baptiste Onofr�
>> jbonofre@apache.org <ma...@apache.org>
>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>> Talend - http://www.talend.com
>> <http://www.talend.com/>
>>
>>
>
>
>
>
>
>
>
>
--
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by amir bahmanyari <am...@yahoo.com>.
Hi Dan,Sorry! I honestly dont know what "so long as you're on a commit after b2b77e380" means :)))If this means there is a specific jar file I need to have on my path, could you point me to a link where I can get the right jar file pls?I appreciate i sir.Amir
From: Thomas Groh <tg...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 10:02 AM
Subject: Re: KafkaIO Usage & Sample Code
Your calls should work - so long as you're on a commit after b2b77e380 (when we started implementing PipelineRunner), the InProcessPipelineRunner should be a valid argument to PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest (https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dh...@google.com> wrote:
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this is specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <am...@yahoo.com> wrote:
Thanks Dan.I actually had tried it before but got compilation errors at setting the InProcessPipelineRunner in the PipelineOptions object..I appreciate it if you point me to a working sample code.FYI, This is my implementation:import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:The method setRunner(Class<? extends PipelineRunner<?>>) in the type PipelineOptions is not applicable for the arguments (Class<InProcessPipelineRunner>). Thanks for your help.Amir-
From: Dan Halperin <dh...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 12:23 AM
Subject: Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <dh...@google.com> wrote:
Hi Amir,
As Frances suggested, you can use the InProcessPipelineRunner instead of the DirectPipelineRunner to execute your pipeline. (They're both in the codebase, it's just that the Direct runner is the default. Use the --runner command line option.)
Amending: it is relatively unlikely that the issues that we caught in testing would affect you. So it should be safe for your use case to do this -- and definitely safe to at least try it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com> wrote:
Thanks gentsWhat are our options in the meanwhile?Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner), and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default runner from Direct to InProcess. (#178)
However, testing unfortunately exposed some issues with the InProcess runner. (Actually, I should say "fortunately" because the tests caught it! Yay!) So we had to roll it back. (#198)
Once we improve the InProcess runner, we can re-do the default swap. After the swap, once the tests keep passing for a few days, we do indeed intend to delete the current Direct pipeline runner and replace it with the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support unbounded PCollections. You
can try using the InProcessPipelineRunner, which is the re-write of
local execution that provides support for unbounded PCollections and
better checking against the Beam Model. (We'll be renaming this to the
DirectPipelineRunner in the near future to avoid having both as soon as
the functionality of the InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see the thread is running, no
exceptions like above, waiting for incoming Kafka data, but the
method obtaining the data from processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into apply(TextIO.Write.to
<http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest nightly build, or build master,
it should work.
As workaround, you can also limit the number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception I sent last night pls?
> When would the sample code to use KafkaIO be released?
> I really appreciate your valuable time. Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main" java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org <ma...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com <http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Thomas Groh <tg...@google.com>.
Your calls should work - so long as you're on a commit after b2b77e380
(when we started implementing PipelineRunner), the InProcessPipelineRunner
should be a valid argument to PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest (
https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73
)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <dh...@google.com> wrote:
> Hi Amir,
>
> The problem is likely in using DataflowPipelineOptions.class -- this is
> specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try
> using just "PipelineOptions".
>
> Dan
>
> On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
>> Thanks Dan.
>> I actually had tried it before but got compilation errors at setting the InProcessPipelineRunner
>> in the PipelineOptions object..
>> I appreciate it if you point me to a working sample code.
>> FYI, This is my implementation:
>> import com.google.cloud.dataflow.sdk.options.PipelineOptions;
>> import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
>> DataflowPipelineOptions Myoptions =
>> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
>> Myoptions.setRunner(InProcessPipelineRunner.class);
>>
>> I cannot set runner as InProcessPipelineRunner in the last line:
>> The method setRunner(Class<? extends PipelineRunner<?>>) in the type
>> PipelineOptions is not applicable for the arguments
>> (Class<InProcessPipelineRunner>).
>> Thanks for your help.
>> Amir-
>>
>>
>> ------------------------------
>> *From:* Dan Halperin <dh...@google.com>
>> *To:* user@beam.incubator.apache.org
>> *Sent:* Monday, May 2, 2016 12:23 AM
>> *Subject:* Re: KafkaIO Usage & Sample Code
>>
>> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <dh...@google.com>
>> wrote:
>>
>> Hi Amir,
>>
>> As Frances suggested, you can use the InProcessPipelineRunner instead of
>> the DirectPipelineRunner to execute your pipeline. (They're both in the
>> codebase, it's just that the Direct runner is the default. Use the --runner
>> command line option.)
>>
>>
>> Amending: it is relatively unlikely that the issues that we caught in
>> testing would affect you. So it should be safe for your use case to do this
>> -- and definitely safe to at least try it out!
>>
>>
>>
>> Dan
>>
>> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com>
>> wrote:
>>
>> Thanks gents
>> What are our options in the meanwhile?
>> Cheers
>>
>> Sent from my iPhone
>>
>> On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
>>
>> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> Oh, thanks Frances.
>>
>> I mixed DirectPipelineRunner ("old" local runner), and
>> InProcessPipelineRunner ("new" local runner) ;)
>>
>> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>>
>>
>> We would like to do this soon, but there are some snags.
>>
>> As a preparation step, Thomas swapped the default runner from Direct to
>> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>>
>> However, testing unfortunately exposed some issues with the InProcess
>> runner. (Actually, I should say "fortunately" because the tests caught it!
>> Yay!) So we had to roll it back. (#198
>> <https://github.com/apache/incubator-beam/pull/198>)
>>
>> Once we improve the InProcess runner, we can re-do the default swap.
>> After the swap, once the tests keep passing for a few days, we do indeed
>> intend to delete the current Direct pipeline runner and replace it with the
>> current InProcess runner.
>>
>> Dan
>>
>>
>>
>> Regards
>> JB
>>
>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>
>> +Thomas, author of the InProcessPipelineRunner
>>
>> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
>> can try using the InProcessPipelineRunner, which is the re-write of
>> local execution that provides support for unbounded PCollections and
>> better checking against the Beam Model. (We'll be renaming this to the
>> DirectPipelineRunner in the near future to avoid having both as soon as
>> the functionality of the InProcessPipelineRunner is complete.)
>>
>> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
>> <ma...@yahoo.com>> wrote:
>>
>> Hi JB,
>> I rebuilt my code with the latest :
>> kafka-0.1.0-incubating-20160501.070733-11.jar
>> <
>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
>> >
>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>> <
>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>> >
>>
>>
>> Tried _without setting withMaxNumRecords()_:
>> Throws java.lang.IllegalStateException: no evaluator registered for
>> Read(UnboundedKafkaSource)
>> at
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> at
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> at
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>
>> _With setting ithMaxNumRecords(_), I see the thread is running, no
>> exceptions like above, waiting for incoming Kafka data, but the
>> method obtaining the data from processElement(ProcessContext ctx)
>> never executes.
>> Therefore, nothing goes into apply(TextIO.Write.to
>> <http://textio.write.to/>
>> <http://TextIO.Write.to <http://textio.write.to/>
>> >("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>
>> I see Kafka Broker reports my laptop IP address as getting a
>> connection to it, OK.
>> Everything looks OK at the server side.
>> Doesn't look like its my lucky day.
>> I appreciate any help/feedback/suggetion.
>> Cheers
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Jean-Baptiste Onofré <jb@nanthrax.net <mailto:jb@nanthrax.net
>> >>
>> *To:* user@beam.incubator.apache.org
>> <ma...@beam.incubator.apache.org>
>> *Sent:* Friday, April 29, 2016 10:36 PM
>> *Subject:* Re: KafkaIO Usage & Sample Code
>>
>>
>> As I said in my previous e-mail, until recently DirectPipelineRunner
>> didn't support Unbounded.
>>
>> It's now fixed, so if you take a latest nightly build, or build
>> master,
>> it should work.
>>
>> As workaround, you can also limit the number of message consumed from
>> Kafka (and so work with bounded).
>>
>> Regards
>> JB
>>
>> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>> > Hi colleagues,
>> > I am moving this conversation to this users mailing list as per
>> Max’s
>> > suggestion.
>> > Thanks Max.
>> > Hi JB,
>> > Hope all is great.
>> > Is there a resolution to the exception I sent last night pls?
>> > When would the sample code to use KafkaIO be released?
>> > I really appreciate your valuable time. Below is the exception
>> for your
>> > reference.
>> > This is how it gets used in my code:
>> >
>> >
>>
>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>> >
>> > Have a wonderful weekend.
>> > Exception in thread "main" java.lang.IllegalStateException: no
>> evaluator
>> > registered for Read(UnboundedKafkaSource)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>> > at
>> >
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>> > at
>> >
>>
>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>> > Kind Regards,
>> > Amir
>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org <ma...@apache.org>
>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>> Talend - http://www.talend.com <http://www.talend.com/>
>>
>>
>>
>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>>
>>
>>
>>
>
Re: KafkaIO Usage & Sample Code
Posted by Dan Halperin <dh...@google.com>.
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this is
specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try
using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <am...@yahoo.com> wrote:
> Thanks Dan.
> I actually had tried it before but got compilation errors at setting the InProcessPipelineRunner
> in the PipelineOptions object..
> I appreciate it if you point me to a working sample code.
> FYI, This is my implementation:
> import com.google.cloud.dataflow.sdk.options.PipelineOptions;
> import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
> DataflowPipelineOptions Myoptions =
> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
> Myoptions.setRunner(InProcessPipelineRunner.class);
>
> I cannot set runner as InProcessPipelineRunner in the last line:
> The method setRunner(Class<? extends PipelineRunner<?>>) in the type
> PipelineOptions is not applicable for the arguments
> (Class<InProcessPipelineRunner>).
> Thanks for your help.
> Amir-
>
>
> ------------------------------
> *From:* Dan Halperin <dh...@google.com>
> *To:* user@beam.incubator.apache.org
> *Sent:* Monday, May 2, 2016 12:23 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <dh...@google.com> wrote:
>
> Hi Amir,
>
> As Frances suggested, you can use the InProcessPipelineRunner instead of
> the DirectPipelineRunner to execute your pipeline. (They're both in the
> codebase, it's just that the Direct runner is the default. Use the --runner
> command line option.)
>
>
> Amending: it is relatively unlikely that the issues that we caught in
> testing would affect you. So it should be safe for your use case to do this
> -- and definitely safe to at least try it out!
>
>
>
> Dan
>
> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com>
> wrote:
>
> Thanks gents
> What are our options in the meanwhile?
> Cheers
>
> Sent from my iPhone
>
> On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
>
> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> Oh, thanks Frances.
>
> I mixed DirectPipelineRunner ("old" local runner), and
> InProcessPipelineRunner ("new" local runner) ;)
>
> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default runner from Direct to
> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>
> However, testing unfortunately exposed some issues with the InProcess
> runner. (Actually, I should say "fortunately" because the tests caught it!
> Yay!) So we had to roll it back. (#198
> <https://github.com/apache/incubator-beam/pull/198>)
>
> Once we improve the InProcess runner, we can re-do the default swap. After
> the swap, once the tests keep passing for a few days, we do indeed intend
> to delete the current Direct pipeline runner and replace it with the
> current InProcess runner.
>
> Dan
>
>
>
> Regards
> JB
>
> On 05/02/2016 03:12 AM, Frances Perry wrote:
>
> +Thomas, author of the InProcessPipelineRunner
>
> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
> can try using the InProcessPipelineRunner, which is the re-write of
> local execution that provides support for unbounded PCollections and
> better checking against the Beam Model. (We'll be renaming this to the
> DirectPipelineRunner in the near future to avoid having both as soon as
> the functionality of the InProcessPipelineRunner is complete.)
>
> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
> Hi JB,
> I rebuilt my code with the latest :
> kafka-0.1.0-incubating-20160501.070733-11.jar
> <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
> >
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> >
>
>
> Tried _without setting withMaxNumRecords()_:
> Throws java.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)
> at
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
> _With setting ithMaxNumRecords(_), I see the thread is running, no
> exceptions like above, waiting for incoming Kafka data, but the
> method obtaining the data from processElement(ProcessContext ctx)
> never executes.
> Therefore, nothing goes into apply(TextIO.Write.to
> <http://textio.write.to/>
> <http://TextIO.Write.to <http://textio.write.to/>
> >("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
> I see Kafka Broker reports my laptop IP address as getting a
> connection to it, OK.
> Everything looks OK at the server side.
> Doesn't look like its my lucky day.
> I appreciate any help/feedback/suggetion.
> Cheers
>
>
> ------------------------------------------------------------------------
> *From:* Jean-Baptiste Onofré <jb@nanthrax.net <mailto:jb@nanthrax.net
> >>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> *Sent:* Friday, April 29, 2016 10:36 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
>
> As I said in my previous e-mail, until recently DirectPipelineRunner
> didn't support Unbounded.
>
> It's now fixed, so if you take a latest nightly build, or build master,
> it should work.
>
> As workaround, you can also limit the number of message consumed from
> Kafka (and so work with bounded).
>
> Regards
> JB
>
> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> > Hi colleagues,
> > I am moving this conversation to this users mailing list as per
> Max’s
> > suggestion.
> > Thanks Max.
> > Hi JB,
> > Hope all is great.
> > Is there a resolution to the exception I sent last night pls?
> > When would the sample code to use KafkaIO be released?
> > I really appreciate your valuable time. Below is the exception
> for your
> > reference.
> > This is how it gets used in my code:
> >
> >
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
> >
> > Have a wonderful weekend.
> > Exception in thread "main" java.lang.IllegalStateException: no
> evaluator
> > registered for Read(UnboundedKafkaSource)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> > at
> >
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > at
> >
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> > Kind Regards,
> > Amir
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>
Re: KafkaIO Usage & Sample Code
Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Dan.I actually had tried it before but got compilation errors at setting the InProcessPipelineRunner in the PipelineOptions object..I appreciate it if you point me to a working sample code.FYI, This is my implementation:import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:The method setRunner(Class<? extends PipelineRunner<?>>) in the type PipelineOptions is not applicable for the arguments (Class<InProcessPipelineRunner>). Thanks for your help.Amir-
From: Dan Halperin <dh...@google.com>
To: user@beam.incubator.apache.org
Sent: Monday, May 2, 2016 12:23 AM
Subject: Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <dh...@google.com> wrote:
Hi Amir,
As Frances suggested, you can use the InProcessPipelineRunner instead of the DirectPipelineRunner to execute your pipeline. (They're both in the codebase, it's just that the Direct runner is the default. Use the --runner command line option.)
Amending: it is relatively unlikely that the issues that we caught in testing would affect you. So it should be safe for your use case to do this -- and definitely safe to at least try it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com> wrote:
Thanks gentsWhat are our options in the meanwhile?Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner), and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default runner from Direct to InProcess. (#178)
However, testing unfortunately exposed some issues with the InProcess runner. (Actually, I should say "fortunately" because the tests caught it! Yay!) So we had to roll it back. (#198)
Once we improve the InProcess runner, we can re-do the default swap. After the swap, once the tests keep passing for a few days, we do indeed intend to delete the current Direct pipeline runner and replace it with the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support unbounded PCollections. You
can try using the InProcessPipelineRunner, which is the re-write of
local execution that provides support for unbounded PCollections and
better checking against the Beam Model. (We'll be renaming this to the
DirectPipelineRunner in the near future to avoid having both as soon as
the functionality of the InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
<ma...@yahoo.com>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see the thread is running, no
exceptions like above, waiting for incoming Kafka data, but the
method obtaining the data from processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into apply(TextIO.Write.to
<http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
*To:* user@beam.incubator.apache.org
<ma...@beam.incubator.apache.org>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest nightly build, or build master,
it should work.
As workaround, you can also limit the number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception I sent last night pls?
> When would the sample code to use KafkaIO be released?
> I really appreciate your valuable time. Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main" java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
jbonofre@apache.org <ma...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com <http://www.talend.com/>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Dan Halperin <dh...@google.com>.
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <dh...@google.com> wrote:
> Hi Amir,
>
> As Frances suggested, you can use the InProcessPipelineRunner instead of
> the DirectPipelineRunner to execute your pipeline. (They're both in the
> codebase, it's just that the Direct runner is the default. Use the --runner
> command line option.)
>
Amending: it is relatively unlikely that the issues that we caught in
testing would affect you. So it should be safe for your use case to do this
-- and definitely safe to at least try it out!
>
> Dan
>
> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com>
> wrote:
>
>> Thanks gents
>> What are our options in the meanwhile?
>> Cheers
>>
>> Sent from my iPhone
>>
>> On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
>>
>> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>>> Oh, thanks Frances.
>>>
>>> I mixed DirectPipelineRunner ("old" local runner), and
>>> InProcessPipelineRunner ("new" local runner) ;)
>>>
>>> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>>>
>>
>> We would like to do this soon, but there are some snags.
>>
>> As a preparation step, Thomas swapped the default runner from Direct to
>> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>>
>> However, testing unfortunately exposed some issues with the InProcess
>> runner. (Actually, I should say "fortunately" because the tests caught it!
>> Yay!) So we had to roll it back. (#198
>> <https://github.com/apache/incubator-beam/pull/198>)
>>
>> Once we improve the InProcess runner, we can re-do the default swap.
>> After the swap, once the tests keep passing for a few days, we do indeed
>> intend to delete the current Direct pipeline runner and replace it with the
>> current InProcess runner.
>>
>> Dan
>>
>>
>>>
>>> Regards
>>> JB
>>>
>>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>>
>>>> +Thomas, author of the InProcessPipelineRunner
>>>>
>>>> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
>>>> can try using the InProcessPipelineRunner, which is the re-write of
>>>> local execution that provides support for unbounded PCollections and
>>>> better checking against the Beam Model. (We'll be renaming this to the
>>>> DirectPipelineRunner in the near future to avoid having both as soon as
>>>> the functionality of the InProcessPipelineRunner is complete.)
>>>>
>>>> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
>>>> <ma...@yahoo.com>> wrote:
>>>>
>>>> Hi JB,
>>>> I rebuilt my code with the latest :
>>>> kafka-0.1.0-incubating-20160501.070733-11.jar
>>>> <
>>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
>>>> >
>>>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>>> <
>>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>>> >
>>>>
>>>>
>>>> Tried _without setting withMaxNumRecords()_:
>>>> Throws java.lang.IllegalStateException: no evaluator registered for
>>>> Read(UnboundedKafkaSource)
>>>> at
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>>> at
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>>> at
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>>
>>>> _With setting ithMaxNumRecords(_), I see the thread is running, no
>>>> exceptions like above, waiting for incoming Kafka data, but the
>>>> method obtaining the data from processElement(ProcessContext ctx)
>>>> never executes.
>>>> Therefore, nothing goes into apply(TextIO.Write.to
>>>> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>>>
>>>> I see Kafka Broker reports my laptop IP address as getting a
>>>> connection to it, OK.
>>>> Everything looks OK at the server side.
>>>> Doesn't look like its my lucky day.
>>>> I appreciate any help/feedback/suggetion.
>>>> Cheers
>>>>
>>>>
>>>> ------------------------------------------------------------------------
>>>> *From:* Jean-Baptiste Onofré <jb@nanthrax.net <mailto:
>>>> jb@nanthrax.net>>
>>>> *To:* user@beam.incubator.apache.org
>>>> <ma...@beam.incubator.apache.org>
>>>> *Sent:* Friday, April 29, 2016 10:36 PM
>>>> *Subject:* Re: KafkaIO Usage & Sample Code
>>>>
>>>>
>>>> As I said in my previous e-mail, until recently DirectPipelineRunner
>>>> didn't support Unbounded.
>>>>
>>>> It's now fixed, so if you take a latest nightly build, or build
>>>> master,
>>>> it should work.
>>>>
>>>> As workaround, you can also limit the number of message consumed
>>>> from
>>>> Kafka (and so work with bounded).
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>>>> > Hi colleagues,
>>>> > I am moving this conversation to this users mailing list as per
>>>> Max’s
>>>> > suggestion.
>>>> > Thanks Max.
>>>> > Hi JB,
>>>> > Hope all is great.
>>>> > Is there a resolution to the exception I sent last night pls?
>>>> > When would the sample code to use KafkaIO be released?
>>>> > I really appreciate your valuable time. Below is the exception
>>>> for your
>>>> > reference.
>>>> > This is how it gets used in my code:
>>>> >
>>>> >
>>>>
>>>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>>>> >
>>>> > Have a wonderful weekend.
>>>> > Exception in thread "main" java.lang.IllegalStateException: no
>>>> evaluator
>>>> > registered for Read(UnboundedKafkaSource)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>>> > at
>>>> >
>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>>> > at
>>>> >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>>> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>>>> > at
>>>> >
>>>>
>>>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>>> > Kind Regards,
>>>> > Amir
>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbonofre@apache.org <ma...@apache.org>
>>>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>>> Talend - http://www.talend.com <http://www.talend.com/>
>>>>
>>>>
>>>>
>>>>
>>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>>
>
Re: KafkaIO Usage & Sample Code
Posted by Dan Halperin <dh...@google.com>.
Hi Amir,
As Frances suggested, you can use the InProcessPipelineRunner instead of
the DirectPipelineRunner to execute your pipeline. (They're both in the
codebase, it's just that the Direct runner is the default. Use the --runner
command line option.)
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <am...@yahoo.com>
wrote:
> Thanks gents
> What are our options in the meanwhile?
> Cheers
>
> Sent from my iPhone
>
> On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
>
> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>> Oh, thanks Frances.
>>
>> I mixed DirectPipelineRunner ("old" local runner), and
>> InProcessPipelineRunner ("new" local runner) ;)
>>
>> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>>
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default runner from Direct to
> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>
> However, testing unfortunately exposed some issues with the InProcess
> runner. (Actually, I should say "fortunately" because the tests caught it!
> Yay!) So we had to roll it back. (#198
> <https://github.com/apache/incubator-beam/pull/198>)
>
> Once we improve the InProcess runner, we can re-do the default swap. After
> the swap, once the tests keep passing for a few days, we do indeed intend
> to delete the current Direct pipeline runner and replace it with the
> current InProcess runner.
>
> Dan
>
>
>>
>> Regards
>> JB
>>
>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>
>>> +Thomas, author of the InProcessPipelineRunner
>>>
>>> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
>>> can try using the InProcessPipelineRunner, which is the re-write of
>>> local execution that provides support for unbounded PCollections and
>>> better checking against the Beam Model. (We'll be renaming this to the
>>> DirectPipelineRunner in the near future to avoid having both as soon as
>>> the functionality of the InProcessPipelineRunner is complete.)
>>>
>>> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
>>> <ma...@yahoo.com>> wrote:
>>>
>>> Hi JB,
>>> I rebuilt my code with the latest :
>>> kafka-0.1.0-incubating-20160501.070733-11.jar
>>> <
>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
>>> >
>>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>> <
>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>> >
>>>
>>>
>>> Tried _without setting withMaxNumRecords()_:
>>> Throws java.lang.IllegalStateException: no evaluator registered for
>>> Read(UnboundedKafkaSource)
>>> at
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> at
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> at
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>
>>> _With setting ithMaxNumRecords(_), I see the thread is running, no
>>> exceptions like above, waiting for incoming Kafka data, but the
>>> method obtaining the data from processElement(ProcessContext ctx)
>>> never executes.
>>> Therefore, nothing goes into apply(TextIO.Write.to
>>> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>>
>>> I see Kafka Broker reports my laptop IP address as getting a
>>> connection to it, OK.
>>> Everything looks OK at the server side.
>>> Doesn't look like its my lucky day.
>>> I appreciate any help/feedback/suggetion.
>>> Cheers
>>>
>>>
>>> ------------------------------------------------------------------------
>>> *From:* Jean-Baptiste Onofré <jb@nanthrax.net <mailto:
>>> jb@nanthrax.net>>
>>> *To:* user@beam.incubator.apache.org
>>> <ma...@beam.incubator.apache.org>
>>> *Sent:* Friday, April 29, 2016 10:36 PM
>>> *Subject:* Re: KafkaIO Usage & Sample Code
>>>
>>>
>>> As I said in my previous e-mail, until recently DirectPipelineRunner
>>> didn't support Unbounded.
>>>
>>> It's now fixed, so if you take a latest nightly build, or build
>>> master,
>>> it should work.
>>>
>>> As workaround, you can also limit the number of message consumed from
>>> Kafka (and so work with bounded).
>>>
>>> Regards
>>> JB
>>>
>>> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>>> > Hi colleagues,
>>> > I am moving this conversation to this users mailing list as per
>>> Max’s
>>> > suggestion.
>>> > Thanks Max.
>>> > Hi JB,
>>> > Hope all is great.
>>> > Is there a resolution to the exception I sent last night pls?
>>> > When would the sample code to use KafkaIO be released?
>>> > I really appreciate your valuable time. Below is the exception
>>> for your
>>> > reference.
>>> > This is how it gets used in my code:
>>> >
>>> >
>>>
>>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>>> >
>>> > Have a wonderful weekend.
>>> > Exception in thread "main" java.lang.IllegalStateException: no
>>> evaluator
>>> > registered for Read(UnboundedKafkaSource)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>> > at
>>> >
>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>> > at
>>> >
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>>> > at
>>> >
>>>
>>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>> > Kind Regards,
>>> > Amir
>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org <ma...@apache.org>
>>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>> Talend - http://www.talend.com <http://www.talend.com/>
>>>
>>>
>>>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>
Re: KafkaIO Usage & Sample Code
Posted by Amir Bahmanyari <am...@yahoo.com>.
Thanks gents
What are our options in the meanwhile?
Cheers
Sent from my iPhone
> On May 2, 2016, at 12:00 AM, Dan Halperin <dh...@google.com> wrote:
>
>> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
>> Oh, thanks Frances.
>>
>> I mixed DirectPipelineRunner ("old" local runner), and InProcessPipelineRunner ("new" local runner) ;)
>>
>> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default runner from Direct to InProcess. (#178)
>
> However, testing unfortunately exposed some issues with the InProcess runner. (Actually, I should say "fortunately" because the tests caught it! Yay!) So we had to roll it back. (#198)
>
> Once we improve the InProcess runner, we can re-do the default swap. After the swap, once the tests keep passing for a few days, we do indeed intend to delete the current Direct pipeline runner and replace it with the current InProcess runner.
>
> Dan
>
>>
>> Regards
>> JB
>>
>>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>> +Thomas, author of the InProcessPipelineRunner
>>>
>>> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
>>> can try using the InProcessPipelineRunner, which is the re-write of
>>> local execution that provides support for unbounded PCollections and
>>> better checking against the Beam Model. (We'll be renaming this to the
>>> DirectPipelineRunner in the near future to avoid having both as soon as
>>> the functionality of the InProcessPipelineRunner is complete.)
>>>
>>> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
>>> <ma...@yahoo.com>> wrote:
>>>
>>> Hi JB,
>>> I rebuilt my code with the latest :
>>> kafka-0.1.0-incubating-20160501.070733-11.jar
>>> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
>>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
>>>
>>>
>>> Tried _without setting withMaxNumRecords()_:
>>> Throws java.lang.IllegalStateException: no evaluator registered for
>>> Read(UnboundedKafkaSource)
>>> at
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> at
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> at
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>
>>> _With setting ithMaxNumRecords(_), I see the thread is running, no
>>> exceptions like above, waiting for incoming Kafka data, but the
>>> method obtaining the data from processElement(ProcessContext ctx)
>>> never executes.
>>> Therefore, nothing goes into apply(TextIO.Write.to
>>> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>>
>>> I see Kafka Broker reports my laptop IP address as getting a
>>> connection to it, OK.
>>> Everything looks OK at the server side.
>>> Doesn't look like its my lucky day.
>>> I appreciate any help/feedback/suggetion.
>>> Cheers
>>>
>>> ------------------------------------------------------------------------
>>> *From:* Jean-Baptiste Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
>>> *To:* user@beam.incubator.apache.org
>>> <ma...@beam.incubator.apache.org>
>>> *Sent:* Friday, April 29, 2016 10:36 PM
>>> *Subject:* Re: KafkaIO Usage & Sample Code
>>>
>>>
>>> As I said in my previous e-mail, until recently DirectPipelineRunner
>>> didn't support Unbounded.
>>>
>>> It's now fixed, so if you take a latest nightly build, or build master,
>>> it should work.
>>>
>>> As workaround, you can also limit the number of message consumed from
>>> Kafka (and so work with bounded).
>>>
>>> Regards
>>> JB
>>>
>>> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>>> > Hi colleagues,
>>> > I am moving this conversation to this users mailing list as per Max’s
>>> > suggestion.
>>> > Thanks Max.
>>> > Hi JB,
>>> > Hope all is great.
>>> > Is there a resolution to the exception I sent last night pls?
>>> > When would the sample code to use KafkaIO be released?
>>> > I really appreciate your valuable time. Below is the exception
>>> for your
>>> > reference.
>>> > This is how it gets used in my code:
>>> >
>>> >
>>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>>> >
>>> > Have a wonderful weekend.
>>> > Exception in thread "main" java.lang.IllegalStateException: no
>>> evaluator
>>> > registered for Read(UnboundedKafkaSource)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>> > at
>>> > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>> > at
>>> >
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>>> > at
>>> >
>>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>> > Kind Regards,
>>> > Amir
>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org <ma...@apache.org>
>>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>> Talend - http://www.talend.com <http://www.talend.com/>
>>>
>>>
>>>
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>
Re: KafkaIO Usage & Sample Code
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
For the tracking, I created Jira BEAM-243.
Regards
JB
On 05/02/2016 09:00 AM, Dan Halperin wrote:
> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofr� <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
> Oh, thanks Frances.
>
> I mixed DirectPipelineRunner ("old" local runner), and
> InProcessPipelineRunner ("new" local runner) ;)
>
> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default runner from Direct to
> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>
> However, testing unfortunately exposed some issues with the InProcess
> runner. (Actually, I should say "fortunately" because the tests caught
> it! Yay!) So we had to roll it back. (#198
> <https://github.com/apache/incubator-beam/pull/198>)
>
> Once we improve the InProcess runner, we can re-do the default swap.
> After the swap, once the tests keep passing for a few days, we do indeed
> intend to delete the current Direct pipeline runner and replace it with
> the current InProcess runner.
>
> Dan
>
>
> Regards
> JB
>
> On 05/02/2016 03:12 AM, Frances Perry wrote:
>
> +Thomas, author of the InProcessPipelineRunner
>
> The DirectPipelineRunner doesn't yet support unbounded
> PCollections. You
> can try using the InProcessPipelineRunner, which is the re-write of
> local execution that provides support for unbounded PCollections and
> better checking against the Beam Model. (We'll be renaming this
> to the
> DirectPipelineRunner in the near future to avoid having both as
> soon as
> the functionality of the InProcessPipelineRunner is complete.)
>
> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari
> <amirtousa@yahoo.com <ma...@yahoo.com>
> <mailto:amirtousa@yahoo.com <ma...@yahoo.com>>> wrote:
>
> Hi JB,
> I rebuilt my code with the latest :
> kafka-0.1.0-incubating-20160501.070733-11.jar
>
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
>
>
> Tried _without setting withMaxNumRecords()_:
> Throws java.lang.IllegalStateException: no evaluator
> registered for
> Read(UnboundedKafkaSource)
> at
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
> _With setting ithMaxNumRecords(_), I see the thread is
> running, no
> exceptions like above, waiting for incoming Kafka data, but the
> method obtaining the data from
> processElement(ProcessContext ctx)
> never executes.
> Therefore, nothing goes into apply(TextIO.Write.to
> <http://TextIO.Write.to>
> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
> I see Kafka Broker reports my laptop IP address as getting a
> connection to it, OK.
> Everything looks OK at the server side.
> Doesn't look like its my lucky day.
> I appreciate any help/feedback/suggetion.
> Cheers
>
>
> ------------------------------------------------------------------------
> *From:* Jean-Baptiste Onofr� <jb@nanthrax.net
> <ma...@nanthrax.net> <mailto:jb@nanthrax.net
> <ma...@nanthrax.net>>>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> <mailto:user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>>
> *Sent:* Friday, April 29, 2016 10:36 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
>
> As I said in my previous e-mail, until recently
> DirectPipelineRunner
> didn't support Unbounded.
>
> It's now fixed, so if you take a latest nightly build, or
> build master,
> it should work.
>
> As workaround, you can also limit the number of message
> consumed from
> Kafka (and so work with bounded).
>
> Regards
> JB
>
> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> > Hi colleagues,
> > I am moving this conversation to this users mailing list
> as per Max\u2019s
> > suggestion.
> > Thanks Max.
> > Hi JB,
> > Hope all is great.
> > Is there a resolution to the exception I sent last night
> pls?
> > When would the sample code to use KafkaIO be released?
> > I really appreciate your valuable time. Below is the
> exception
> for your
> > reference.
> > This is how it gets used in my code:
> >
> >
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
> >
> > Have a wonderful weekend.
> > Exception in thread "main"
> java.lang.IllegalStateException: no
> evaluator
> > registered for Read(UnboundedKafkaSource)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> > at
> >
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> > at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > at
> >
>
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> > Kind Regards,
> > Amir
>
>
> --
> Jean-Baptiste Onofr�
> jbonofre@apache.org <ma...@apache.org>
> <mailto:jbonofre@apache.org <ma...@apache.org>>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>
>
>
> --
> Jean-Baptiste Onofr�
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
--
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
It makes sense. Thanks Dan !
Regards
JB
On 05/02/2016 09:00 AM, Dan Halperin wrote:
> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofr� <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
> Oh, thanks Frances.
>
> I mixed DirectPipelineRunner ("old" local runner), and
> InProcessPipelineRunner ("new" local runner) ;)
>
> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default runner from Direct to
> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>
> However, testing unfortunately exposed some issues with the InProcess
> runner. (Actually, I should say "fortunately" because the tests caught
> it! Yay!) So we had to roll it back. (#198
> <https://github.com/apache/incubator-beam/pull/198>)
>
> Once we improve the InProcess runner, we can re-do the default swap.
> After the swap, once the tests keep passing for a few days, we do indeed
> intend to delete the current Direct pipeline runner and replace it with
> the current InProcess runner.
>
> Dan
>
>
> Regards
> JB
>
> On 05/02/2016 03:12 AM, Frances Perry wrote:
>
> +Thomas, author of the InProcessPipelineRunner
>
> The DirectPipelineRunner doesn't yet support unbounded
> PCollections. You
> can try using the InProcessPipelineRunner, which is the re-write of
> local execution that provides support for unbounded PCollections and
> better checking against the Beam Model. (We'll be renaming this
> to the
> DirectPipelineRunner in the near future to avoid having both as
> soon as
> the functionality of the InProcessPipelineRunner is complete.)
>
> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari
> <amirtousa@yahoo.com <ma...@yahoo.com>
> <mailto:amirtousa@yahoo.com <ma...@yahoo.com>>> wrote:
>
> Hi JB,
> I rebuilt my code with the latest :
> kafka-0.1.0-incubating-20160501.070733-11.jar
>
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
>
>
> Tried _without setting withMaxNumRecords()_:
> Throws java.lang.IllegalStateException: no evaluator
> registered for
> Read(UnboundedKafkaSource)
> at
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
> _With setting ithMaxNumRecords(_), I see the thread is
> running, no
> exceptions like above, waiting for incoming Kafka data, but the
> method obtaining the data from
> processElement(ProcessContext ctx)
> never executes.
> Therefore, nothing goes into apply(TextIO.Write.to
> <http://TextIO.Write.to>
> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
> I see Kafka Broker reports my laptop IP address as getting a
> connection to it, OK.
> Everything looks OK at the server side.
> Doesn't look like its my lucky day.
> I appreciate any help/feedback/suggetion.
> Cheers
>
>
> ------------------------------------------------------------------------
> *From:* Jean-Baptiste Onofr� <jb@nanthrax.net
> <ma...@nanthrax.net> <mailto:jb@nanthrax.net
> <ma...@nanthrax.net>>>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> <mailto:user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>>
> *Sent:* Friday, April 29, 2016 10:36 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
>
> As I said in my previous e-mail, until recently
> DirectPipelineRunner
> didn't support Unbounded.
>
> It's now fixed, so if you take a latest nightly build, or
> build master,
> it should work.
>
> As workaround, you can also limit the number of message
> consumed from
> Kafka (and so work with bounded).
>
> Regards
> JB
>
> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> > Hi colleagues,
> > I am moving this conversation to this users mailing list
> as per Max\u2019s
> > suggestion.
> > Thanks Max.
> > Hi JB,
> > Hope all is great.
> > Is there a resolution to the exception I sent last night
> pls?
> > When would the sample code to use KafkaIO be released?
> > I really appreciate your valuable time. Below is the
> exception
> for your
> > reference.
> > This is how it gets used in my code:
> >
> >
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
> >
> > Have a wonderful weekend.
> > Exception in thread "main"
> java.lang.IllegalStateException: no
> evaluator
> > registered for Read(UnboundedKafkaSource)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> > at
> >
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> > at
> >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> > at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > at
> >
>
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> > Kind Regards,
> > Amir
>
>
> --
> Jean-Baptiste Onofr�
> jbonofre@apache.org <ma...@apache.org>
> <mailto:jbonofre@apache.org <ma...@apache.org>>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>
>
>
> --
> Jean-Baptiste Onofr�
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
--
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Dan Halperin <dh...@google.com>.
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:
> Oh, thanks Frances.
>
> I mixed DirectPipelineRunner ("old" local runner), and
> InProcessPipelineRunner ("new" local runner) ;)
>
> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default runner from Direct to
InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
However, testing unfortunately exposed some issues with the InProcess
runner. (Actually, I should say "fortunately" because the tests caught it!
Yay!) So we had to roll it back. (#198
<https://github.com/apache/incubator-beam/pull/198>)
Once we improve the InProcess runner, we can re-do the default swap. After
the swap, once the tests keep passing for a few days, we do indeed intend
to delete the current Direct pipeline runner and replace it with the
current InProcess runner.
Dan
>
> Regards
> JB
>
> On 05/02/2016 03:12 AM, Frances Perry wrote:
>
>> +Thomas, author of the InProcessPipelineRunner
>>
>> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
>> can try using the InProcessPipelineRunner, which is the re-write of
>> local execution that provides support for unbounded PCollections and
>> better checking against the Beam Model. (We'll be renaming this to the
>> DirectPipelineRunner in the near future to avoid having both as soon as
>> the functionality of the InProcessPipelineRunner is complete.)
>>
>> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
>> <ma...@yahoo.com>> wrote:
>>
>> Hi JB,
>> I rebuilt my code with the latest :
>> kafka-0.1.0-incubating-20160501.070733-11.jar
>> <
>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
>> >
>> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>> <
>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>> >
>>
>>
>> Tried _without setting withMaxNumRecords()_:
>> Throws java.lang.IllegalStateException: no evaluator registered for
>> Read(UnboundedKafkaSource)
>> at
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> at
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> at
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>
>> _With setting ithMaxNumRecords(_), I see the thread is running, no
>> exceptions like above, waiting for incoming Kafka data, but the
>> method obtaining the data from processElement(ProcessContext ctx)
>> never executes.
>> Therefore, nothing goes into apply(TextIO.Write.to
>> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>
>> I see Kafka Broker reports my laptop IP address as getting a
>> connection to it, OK.
>> Everything looks OK at the server side.
>> Doesn't look like its my lucky day.
>> I appreciate any help/feedback/suggetion.
>> Cheers
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Jean-Baptiste Onofré <jb@nanthrax.net <mailto:jb@nanthrax.net
>> >>
>> *To:* user@beam.incubator.apache.org
>> <ma...@beam.incubator.apache.org>
>> *Sent:* Friday, April 29, 2016 10:36 PM
>> *Subject:* Re: KafkaIO Usage & Sample Code
>>
>>
>> As I said in my previous e-mail, until recently DirectPipelineRunner
>> didn't support Unbounded.
>>
>> It's now fixed, so if you take a latest nightly build, or build
>> master,
>> it should work.
>>
>> As workaround, you can also limit the number of message consumed from
>> Kafka (and so work with bounded).
>>
>> Regards
>> JB
>>
>> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>> > Hi colleagues,
>> > I am moving this conversation to this users mailing list as per
>> Max’s
>> > suggestion.
>> > Thanks Max.
>> > Hi JB,
>> > Hope all is great.
>> > Is there a resolution to the exception I sent last night pls?
>> > When would the sample code to use KafkaIO be released?
>> > I really appreciate your valuable time. Below is the exception
>> for your
>> > reference.
>> > This is how it gets used in my code:
>> >
>> >
>>
>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>> >
>> > Have a wonderful weekend.
>> > Exception in thread "main" java.lang.IllegalStateException: no
>> evaluator
>> > registered for Read(UnboundedKafkaSource)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>> > at
>> >
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>> > at
>> >
>>
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>> > at
>> >
>>
>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>> > Kind Regards,
>> > Amir
>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org <ma...@apache.org>
>> http://blog.nanthrax.net <http://blog.nanthrax.net/>
>> Talend - http://www.talend.com <http://www.talend.com/>
>>
>>
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
Re: KafkaIO Usage & Sample Code
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner), and
InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
> +Thomas, author of the InProcessPipelineRunner
>
> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
> can try using the InProcessPipelineRunner, which is the re-write of
> local execution that provides support for unbounded PCollections and
> better checking against the Beam Model. (We'll be renaming this to the
> DirectPipelineRunner in the near future to avoid having both as soon as
> the functionality of the InProcessPipelineRunner is complete.)
>
> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <amirtousa@yahoo.com
> <ma...@yahoo.com>> wrote:
>
> Hi JB,
> I rebuilt my code with the latest :
> kafka-0.1.0-incubating-20160501.070733-11.jar
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
>
>
> Tried _without setting withMaxNumRecords()_:
> Throws java.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
> _With setting ithMaxNumRecords(_), I see the thread is running, no
> exceptions like above, waiting for incoming Kafka data, but the
> method obtaining the data from processElement(ProcessContext ctx)
> never executes.
> Therefore, nothing goes into apply(TextIO.Write.to
> <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
> I see Kafka Broker reports my laptop IP address as getting a
> connection to it, OK.
> Everything looks OK at the server side.
> Doesn't look like its my lucky day.
> I appreciate any help/feedback/suggetion.
> Cheers
>
> ------------------------------------------------------------------------
> *From:* Jean-Baptiste Onofr� <jb@nanthrax.net <ma...@nanthrax.net>>
> *To:* user@beam.incubator.apache.org
> <ma...@beam.incubator.apache.org>
> *Sent:* Friday, April 29, 2016 10:36 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> As I said in my previous e-mail, until recently DirectPipelineRunner
> didn't support Unbounded.
>
> It's now fixed, so if you take a latest nightly build, or build master,
> it should work.
>
> As workaround, you can also limit the number of message consumed from
> Kafka (and so work with bounded).
>
> Regards
> JB
>
> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> > Hi colleagues,
> > I am moving this conversation to this users mailing list as per Max\u2019s
> > suggestion.
> > Thanks Max.
> > Hi JB,
> > Hope all is great.
> > Is there a resolution to the exception I sent last night pls?
> > When would the sample code to use KafkaIO be released?
> > I really appreciate your valuable time. Below is the exception
> for your
> > reference.
> > This is how it gets used in my code:
> >
> >
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
> >
> > Have a wonderful weekend.
> > Exception in thread "main" java.lang.IllegalStateException: no
> evaluator
> > registered for Read(UnboundedKafkaSource)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> > at
> >
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> > at
> >
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> > at
> > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > at
> >
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> > Kind Regards,
> > Amir
>
>
> --
> Jean-Baptiste Onofr�
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>
>
--
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: KafkaIO Usage & Sample Code
Posted by Frances Perry <fj...@google.com>.
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support unbounded PCollections. You
can try using the InProcessPipelineRunner, which is the re-write of local
execution that provides support for unbounded PCollections and better
checking against the Beam Model. (We'll be renaming this to the
DirectPipelineRunner in the near future to avoid having both as soon as the
functionality of the InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <am...@yahoo.com> wrote:
> Hi JB,
> I rebuilt my code with the latest :
> kafka-0.1.0-incubating-20160501.070733-11.jar
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> <https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
>
>
> Tried *without setting withMaxNumRecords()*:
> Throws java.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
> *With setting ithMaxNumRecords(*), I see the thread is running, no
> exceptions like above, waiting for incoming Kafka data, but the method
> obtaining the data from processElement(ProcessContext ctx) never executes.
> Therefore, nothing goes into apply(TextIO.Write.to
> ("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
> I see Kafka Broker reports my laptop IP address as getting a connection to
> it, OK.
> Everything looks OK at the server side.
> Doesn't look like its my lucky day.
> I appreciate any help/feedback/suggetion.
> Cheers
>
> ------------------------------
> *From:* Jean-Baptiste Onofré <jb...@nanthrax.net>
> *To:* user@beam.incubator.apache.org
> *Sent:* Friday, April 29, 2016 10:36 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> As I said in my previous e-mail, until recently DirectPipelineRunner
> didn't support Unbounded.
>
> It's now fixed, so if you take a latest nightly build, or build master,
> it should work.
>
> As workaround, you can also limit the number of message consumed from
> Kafka (and so work with bounded).
>
> Regards
> JB
>
> On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> > Hi colleagues,
> > I am moving this conversation to this users mailing list as per Max’s
> > suggestion.
> > Thanks Max.
> > Hi JB,
> > Hope all is great.
> > Is there a resolution to the exception I sent last night pls?
> > When would the sample code to use KafkaIO be released?
> > I really appreciate your valuable time. Below is the exception for your
> > reference.
> > This is how it gets used in my code:
> >
> >
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
> >
> > Have a wonderful weekend.
> > Exception in thread "main" java.lang.IllegalStateException: no evaluator
> > registered for Read(UnboundedKafkaSource)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> > at
> >
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> > at
> >
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> > at
> > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> > at
> >
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > at
> > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> > Kind Regards,
> > Amir
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>