You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Wyatt Frelot <wj...@gmail.com> on 2017/02/08 02:16:10 UTC

Streaming with Direct Runner

Good evening all,

I am working on project whose goal is to show the strength of beam and
build a "pipeline" that ingests both bounded and unbounded data.

I use NiFi to both "batch up" and stream the same dataset. I am running
this using the Direct Runner

Bounded: I can setup the pipeline to read and process the files. Works as
suspected

Unbounded: *Doesn't work*, but I am certain it's because I don't understand
how the pipeline needs to be created to create the PCollection. I can't
find any good examples

#*I KNOW THIS IS WRONG*






*StreamingOptions streamingOptions =PipelineOptions.class.;Pipeline
stream = Pipeline.create(streamingOptions);stream.apply("WordStream",
Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>()))
     .apply("ExtraLinesStream",ParDo.of(new AnomalyAlertDetector()))
   .apply("WriteToAccumuloStream", ParDo.of(new
WriteToAccumuloStream()));*

Re: Streaming with Direct Runner

Posted by Thomas Groh <tg...@google.com>.
You should just be able to read from the source directly - the DirectRunner
can read from both Bounded and Unbounded sources without any additional
configuration.

On Tue, Feb 7, 2017 at 6:16 PM, Wyatt Frelot <wj...@gmail.com> wrote:

> Good evening all,
>
> I am working on project whose goal is to show the strength of beam and
> build a "pipeline" that ingests both bounded and unbounded data.
>
> I use NiFi to both "batch up" and stream the same dataset. I am running
> this using the Direct Runner
>
> Bounded: I can setup the pipeline to read and process the files. Works as
> suspected
>
> Unbounded: *Doesn't work*, but I am certain it's because I don't
> understand how the pipeline needs to be created to create the PCollection.
> I can't find any good examples
>
> #*I KNOW THIS IS WRONG*
>
>
>
>
>
>
> *StreamingOptions streamingOptions =PipelineOptions.class.;Pipeline stream = Pipeline.create(streamingOptions);stream.apply("WordStream", Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>()))      .apply("ExtraLinesStream",ParDo.of(new AnomalyAlertDetector()))      .apply("WriteToAccumuloStream", ParDo.of(new WriteToAccumuloStream()));*
>
>
>
>

Re: Streaming with Direct Runner

Posted by Wyatt Frelot <wj...@gmail.com>.
Stephen,

I think you are correct. I have Apache NiFi batching up log files once they
get a certain size. Then I have this same lines tailed from a syslog sent
line by line to a streaming server I created.

I was hoping it was something generic enough to simply point to IP and port
and read.

I think your idea about Kafka is a pretty good one without going down the
further customization path at this time.

Thanks all for your responses. Great community and Apache Beam is awesome.

Wyatt


On Feb 8, 2017 12:57 PM, "Stephen Sisk" <si...@google.com> wrote:

> hi Wyatt,
>
> I suspect there's a terminology disconnect or we don't understand your
> question. When you say "read from an unbounded source" - what are you
> trying to read from? Kafka/Pubsub/directly from Nifi/etc? You mention that
> for batch you're reading from files - are you also wanting to read from
> files for streaming (ie, tailing files/watching for new files?) You mention
> specifying a port, so I'm guessing that you're not reading from files.
>
> Along related lines: Do you have a specific beam Source class/read
> transform that you're using to do the read? That is, are you using
> something like KafkaIO? If you're trying to read from Nifi, it looks like
> it can emit data to kafka/etc.. If you want to read directly from Nifi you
> might have to write a nifi unbounded source (or use the brand new
> splittableDoFn), but it would probably be easier to have Nifi write to
> kafka or something similar, and then have beam read from that.
>
> (I assume you're talking about Apache Nifi)
>
> S
>
> On Wed, Feb 8, 2017 at 9:37 AM Wyatt Frelot <wj...@gmail.com> wrote:
>
>> I guess the question is what is the code to read from an unbounded
>> source. I have went through the documentation and it has been
>> unclear...Spefically identifying the port to pull from.
>>
>> Simply put, had a hard time understanding how to read from an unbound
>> source.
>>
>> Wyatt F.
>>
>>
>> On Feb 8, 2017 6:39 AM, "Jean-Baptiste Onofré" <jb...@nanthrax.net> wrote:
>>
>> Hi
>>
>> Direct runner is able to read from unbounded source. The next transforms
>> of your pipeline have to deal with unbounded pcollection.
>>
>> Regards
>> JB
>> On Feb 7, 2017, at 22:16, Wyatt Frelot <wj...@gmail.com> wrote:
>>
>> Good evening all,
>>
>> I am working on project whose goal is to show the strength of beam and
>> build a "pipeline" that ingests both bounded and unbounded data.
>>
>> I use NiFi to both "batch up" and stream the same dataset. I am running
>> this using the Direct Runner
>>
>> Bounded: I can setup the pipeline to read and process the files. Works as
>> suspected
>>
>> Unbounded: *Doesn't work*, but I am certain it's because I don't
>> understand how the pipeline needs to be created to create the PCollection.
>> I can't find any good examples
>>
>> #*I KNOW THIS IS WRONG*
>>
>>
>>
>>
>>
>>
>> *StreamingOptions streamingOptions =PipelineOptions.class.;Pipeline stream = Pipeline.create(streamingOptions);stream.apply("WordStream", Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>()))      .apply("ExtraLinesStream",ParDo.of(new AnomalyAlertDetector()))      .apply("WriteToAccumuloStream", ParDo.of(new WriteToAccumuloStream()));*
>>
>>
>>
>>

Re: Streaming with Direct Runner

Posted by Stephen Sisk <si...@google.com>.
hi Wyatt,

I suspect there's a terminology disconnect or we don't understand your
question. When you say "read from an unbounded source" - what are you
trying to read from? Kafka/Pubsub/directly from Nifi/etc? You mention that
for batch you're reading from files - are you also wanting to read from
files for streaming (ie, tailing files/watching for new files?) You mention
specifying a port, so I'm guessing that you're not reading from files.

Along related lines: Do you have a specific beam Source class/read
transform that you're using to do the read? That is, are you using
something like KafkaIO? If you're trying to read from Nifi, it looks like
it can emit data to kafka/etc.. If you want to read directly from Nifi you
might have to write a nifi unbounded source (or use the brand new
splittableDoFn), but it would probably be easier to have Nifi write to
kafka or something similar, and then have beam read from that.

(I assume you're talking about Apache Nifi)

S

On Wed, Feb 8, 2017 at 9:37 AM Wyatt Frelot <wj...@gmail.com> wrote:

> I guess the question is what is the code to read from an unbounded source.
> I have went through the documentation and it has been unclear...Spefically
> identifying the port to pull from.
>
> Simply put, had a hard time understanding how to read from an unbound
> source.
>
> Wyatt F.
>
>
> On Feb 8, 2017 6:39 AM, "Jean-Baptiste Onofré" <jb...@nanthrax.net> wrote:
>
> Hi
>
> Direct runner is able to read from unbounded source. The next transforms
> of your pipeline have to deal with unbounded pcollection.
>
> Regards
> JB
> On Feb 7, 2017, at 22:16, Wyatt Frelot <wj...@gmail.com> wrote:
>
> Good evening all,
>
> I am working on project whose goal is to show the strength of beam and
> build a "pipeline" that ingests both bounded and unbounded data.
>
> I use NiFi to both "batch up" and stream the same dataset. I am running
> this using the Direct Runner
>
> Bounded: I can setup the pipeline to read and process the files. Works as
> suspected
>
> Unbounded: *Doesn't work*, but I am certain it's because I don't
> understand how the pipeline needs to be created to create the PCollection.
> I can't find any good examples
>
> #*I KNOW THIS IS WRONG*
>
>
>
>
>
>
> *StreamingOptions streamingOptions =PipelineOptions.class.;Pipeline stream = Pipeline.create(streamingOptions);stream.apply("WordStream", Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>()))      .apply("ExtraLinesStream",ParDo.of(new AnomalyAlertDetector()))      .apply("WriteToAccumuloStream", ParDo.of(new WriteToAccumuloStream()));*
>
>
>
>

Re: Streaming with Direct Runner

Posted by Wyatt Frelot <wj...@gmail.com>.
I guess the question is what is the code to read from an unbounded source.
I have went through the documentation and it has been unclear...Spefically
identifying the port to pull from.

Simply put, had a hard time understanding how to read from an unbound
source.

Wyatt F.


On Feb 8, 2017 6:39 AM, "Jean-Baptiste Onofré" <jb...@nanthrax.net> wrote:

> Hi
>
> Direct runner is able to read from unbounded source. The next transforms
> of your pipeline have to deal with unbounded pcollection.
>
> Regards
> JB
> On Feb 7, 2017, at 22:16, Wyatt Frelot <wj...@gmail.com> wrote:
>>
>> Good evening all,
>>
>> I am working on project whose goal is to show the strength of beam and
>> build a "pipeline" that ingests both bounded and unbounded data.
>>
>> I use NiFi to both "batch up" and stream the same dataset. I am running
>> this using the Direct Runner
>>
>> Bounded: I can setup the pipeline to read and process the files. Works as
>> suspected
>>
>> Unbounded: *Doesn't work*, but I am certain it's because I don't
>> understand how the pipeline needs to be created to create the PCollection.
>> I can't find any good examples
>>
>> #*I KNOW THIS IS WRONG*
>>
>>
>>
>>
>>
>>
>> *StreamingOptions streamingOptions =PipelineOptions.class.;Pipeline stream = Pipeline.create(streamingOptions);stream.apply("WordStream", Read.from(new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>()))      .apply("ExtraLinesStream",ParDo.of(new AnomalyAlertDetector()))      .apply("WriteToAccumuloStream", ParDo.of(new WriteToAccumuloStream()));*
>>
>>
>>
>>

Re: Streaming with Direct Runner

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi

Direct runner is able to read from unbounded source. The next transforms of your pipeline have to deal with unbounded pcollection.

Regards
JB

On Feb 7, 2017, 22:16, at 22:16, Wyatt Frelot <wj...@gmail.com> wrote:
>Good evening all,
>
>I am working on project whose goal is to show the strength of beam and
>build a "pipeline" that ingests both bounded and unbounded data.
>
>I use NiFi to both "batch up" and stream the same dataset. I am running
>this using the Direct Runner
>
>Bounded: I can setup the pipeline to read and process the files. Works
>as
>suspected
>
>Unbounded: *Doesn't work*, but I am certain it's because I don't
>understand
>how the pipeline needs to be created to create the PCollection. I can't
>find any good examples
>
>#*I KNOW THIS IS WRONG*
>
>
>
>
>
>
>*StreamingOptions streamingOptions =PipelineOptions.class.;Pipeline
>stream = Pipeline.create(streamingOptions);stream.apply("WordStream",
>Read.from(new
>UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>()))
>     .apply("ExtraLinesStream",ParDo.of(new AnomalyAlertDetector()))
>   .apply("WriteToAccumuloStream", ParDo.of(new
>WriteToAccumuloStream()));*