You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ahmed Nader <ah...@gmail.com> on 2016/05/26 15:09:20 UTC

Collect output of transformations on a custom source in real time

Hello,
I have defined a custom source function for an infinite stream source,
where in my overwritten run method I have a while true loop to keep
listening for the input. I want to apply some transformations on the
resulting datastream from my source and collect the output so far of these
transformations in a collection.
However when i leave my source running in an infinite loop, nothing is
really executed.
Here are some parts of my code to clarify more:

my custom source class:
public class FeedSource implements SourceFunction<Object>

The run method in this class has a while(boolean variable == true)

Then I call my source and apply filter on it:
datastream = env.addSource(new FeedSource()).filter();

then execute:
env.execute();

I want then to collect my datastream in a collection:
Iterator iter = DataStreamUtils.collect(datastream);

So is it possible to first of all apply filter on my stream that way? And
then If I'm able to do so, is it possible to keep updating my collection
with the content in my datastream so far?

I hope I was able to make my question clear enough.
Thanks,
Ahmed

Re: Collect output of transformations on a custom source in real time

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I am not sure I understand the problem exactly, but one problem I see in
your code is that you call "execute()" on and then "DataStreamUtils.collect(
datastream);"

The first call to "env.execute()" will start the program (source and
filter) and the results will simply go nowhere.
Then you call "DataStreamUtils.collect(datastream);", which internally
calls "execute" again.

In short: remote the first call to "env.execute()", that should do the
trick.

Stephan


On Thu, May 26, 2016 at 5:09 PM, Ahmed Nader <ah...@gmail.com>
wrote:

> Hello,
> I have defined a custom source function for an infinite stream source,
> where in my overwritten run method I have a while true loop to keep
> listening for the input. I want to apply some transformations on the
> resulting datastream from my source and collect the output so far of these
> transformations in a collection.
> However when i leave my source running in an infinite loop, nothing is
> really executed.
> Here are some parts of my code to clarify more:
>
> my custom source class:
> public class FeedSource implements SourceFunction<Object>
>
> The run method in this class has a while(boolean variable == true)
>
> Then I call my source and apply filter on it:
> datastream = env.addSource(new FeedSource()).filter();
>
> then execute:
> env.execute();
>
> I want then to collect my datastream in a collection:
> Iterator iter = DataStreamUtils.collect(datastream);
>
> So is it possible to first of all apply filter on my stream that way? And
> then If I'm able to do so, is it possible to keep updating my collection
> with the content in my datastream so far?
>
> I hope I was able to make my question clear enough.
> Thanks,
> Ahmed
>
>
>