You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pedro Monteiro <pe...@gmail.com> on 2017/02/16 09:30:56 UTC

Streaming data from MongoDB using Flink

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

*Pedro Lima Monteiro*

Re: Streaming data from MongoDB using Flink

Posted by Pedro Monteiro <pe...@gmail.com>.
Dear Gordon, Till

Thank you so much for your helpful answers. I managed to solve my problem
with your guidelines.

Much appreciated, keep up the good work!

Cheers

Cumprimentos,

*Pedro Lima Monteiro*

On 17 February 2017 at 10:10, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Sorry, I just realized I didn’t notice the second part question of your
> last email when replying.
> Thanks Till for answering it!
>
>
> On February 17, 2017 at 6:05:58 PM, Till Rohrmann (trohrmann@apache.org)
> wrote:
>
> Dear Gordon,
>
> Thanks for your help, I think I am on the right track as of now.
>
> On the other hand, I have another question: is it possible to add sources
> to environments that are already executing? In what I am currently
> developing, I need to add new sources as they arrive to my system.
>
> I will wait to hear from you!
>
> Cumprimentos,
>
>
>

Re: Streaming data from MongoDB using Flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Sorry, I just realized I didn’t notice the second part question of your last email when replying.
Thanks Till for answering it!


On February 17, 2017 at 6:05:58 PM, Till Rohrmann (trohrmann@apache.org) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,



Re: Streaming data from MongoDB using Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Pedro,

in order to add new sources you have to first stop the job (maybe taking a
savepoint if you want to resume later on) and then restart the job with the
changed topology.

Cheers,
Till

On Thu, Feb 16, 2017 at 4:06 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Good to know!
>
>
> On February 16, 2017 at 10:13:28 PM, Pedro Monteiro (
> pedro.mlmonteiro@gmail.com) wrote:
>
> Dear Gordon,
>
> Thanks for your help, I think I am on the right track as of now.
>
> On the other hand, I have another question: is it possible to add sources
> to environments that are already executing? In what I am currently
> developing, I need to add new sources as they arrive to my system.
>
> I will wait to hear from you!
>
> Cumprimentos,
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 11:29, Pedro Monteiro <pe...@gmail.com>
> wrote:
>
>> Thank you again for your prompt response.
>>
>> I will give it a try and will come back to you.
>>
>> *Pedro Lima Monteiro*
>>
>> On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> I would recommend checking out the Flink RabbitMQ Source for examples:
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-rabbitmq/src/main/java/org/apache/flink/str
>>> eaming/connectors/rabbitmq/RMQSource.java
>>>
>>> For your case, you should extend the `RichSourceFunction` which provides
>>> additional access to override the `open()` life cycle method.
>>> In that method, you instantiate your MongoDB client connection and
>>>  fetch the cursor. In the `run()` method, you should essentially have a
>>> while loop that polls the MongoDB cursor and emits the fetched documents
>>> using the `SourceContext`.
>>>
>>> If your also looking to implement a MongoDB source that works with
>>> Flink’s checkpointing for exactly-once, be sure to check out:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>> dev/stream/state.html#stateful-source-functions
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
>>> pedro.mlmonteiro@gmail.com) wrote:
>>>
>>> Dear Tzu-Li,
>>>
>>> Thank you so much for your prompt response.
>>>
>>> Lets assume I have a variable, in Java, env which is my
>>> StreamExecutionEnvironment. When I go ahead and attempt to do:
>>>
>>>> ​env.addSource();
>>>>
>>>
>>> ​It requests an implementation of a Source Function interface:
>>> ​
>>>
>>>> env.addSource(new SourceFunction<Document>() {
>>>
>>>
>>>> ​​
>>>> @Override
>>>
>>>             public void run(SourceFunction.SourceContext<Document> ctx)
>>>> throws Exception {
>>>
>>>
>>>> ​// TO DO​
>>>>
>>>             }
>>>
>>>
>>>>             @Override
>>>
>>>             public void cancel() {
>>>
>>>
>>>> ​// TO DO​
>>>>
>>>             }
>>>
>>>         });
>>>
>>> ​And this is where I'm somehow stuck. I do not understand how should I
>>> access my MongoDB's cursor in any of this methods (I suppose the most
>>> adequate would be the "run" method) in a way it would allow me to return a
>>> new MongoDB document as it arrived to the database from another source.
>>>
>>> Once again, thank you so much for your help.
>>>
>>> I will wait to hear from you!​
>>>
>>> Cumprimentos,
>>>
>>> *Pedro Lima Monteiro*
>>>
>>> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tz...@apache.org>
>>> wrote:
>>>
>>>> Hi Pedro!
>>>>
>>>> This is definitely possible, by simply writing a Flink `SourceFunction`
>>>> that uses MongoDB clients to fetch the data.
>>>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>>>
>>>> Could you explain a bit which part in particular you were stuck with?
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>>>> pedro.mlmonteiro@gmail.com) wrote:
>>>>
>>>> Good morning,
>>>>
>>>> I am trying to get data from MongoDB to be analysed in Flink.
>>>> I would like to know if it is possible to stream data from MongoDB into
>>>> Flink. I have looked into Flink's source function to add in the
>>>> addSource
>>>> method of the StreamExecutionEnvironment but I had no luck.
>>>> Can anyone help me out?
>>>> Thanks.
>>>>
>>>> *Pedro Lima Monteiro*
>>>>
>>>>
>>>
>>
>

Re: Streaming data from MongoDB using Flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Good to know!


On February 16, 2017 at 10:13:28 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro <pe...@gmail.com> wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction<Document>() {
            ​​ @Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro




Re: Streaming data from MongoDB using Flink

Posted by Pedro Monteiro <pe...@gmail.com>.
Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources
to environments that are already executing? In what I am currently
developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

*Pedro Lima Monteiro*

On 16 February 2017 at 11:29, Pedro Monteiro <pe...@gmail.com>
wrote:

> Thank you again for your prompt response.
>
> I will give it a try and will come back to you.
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> I would recommend checking out the Flink RabbitMQ Source for examples:
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-rabbitmq/src/main/java/org/apache/flink/
>> streaming/connectors/rabbitmq/RMQSource.java
>>
>> For your case, you should extend the `RichSourceFunction` which provides
>> additional access to override the `open()` life cycle method.
>> In that method, you instantiate your MongoDB client connection and  fetch
>> the cursor. In the `run()` method, you should essentially have a while loop
>> that polls the MongoDB cursor and emits the fetched documents using the
>> `SourceContext`.
>>
>> If your also looking to implement a MongoDB source that works with
>> Flink’s checkpointing for exactly-once, be sure to check out:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/stream/state.html#stateful-source-functions
>>
>> Cheers,
>> Gordon
>>
>> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
>> pedro.mlmonteiro@gmail.com) wrote:
>>
>> Dear Tzu-Li,
>>
>> Thank you so much for your prompt response.
>>
>> Lets assume I have a variable, in Java, env which is my
>> StreamExecutionEnvironment. When I go ahead and attempt to do:
>>
>>> ​env.addSource();
>>>
>>
>> ​It requests an implementation of a Source Function interface:
>> ​
>>
>>> env.addSource(new SourceFunction<Document>() {
>>
>>
>>> ​​
>>> @Override
>>
>>             public void run(SourceFunction.SourceContext<Document> ctx)
>>> throws Exception {
>>
>>
>>> ​// TO DO​
>>>
>>             }
>>
>>
>>>             @Override
>>
>>             public void cancel() {
>>
>>
>>> ​// TO DO​
>>>
>>             }
>>
>>         });
>>
>> ​And this is where I'm somehow stuck. I do not understand how should I
>> access my MongoDB's cursor in any of this methods (I suppose the most
>> adequate would be the "run" method) in a way it would allow me to return a
>> new MongoDB document as it arrived to the database from another source.
>>
>> Once again, thank you so much for your help.
>>
>> I will wait to hear from you!​
>>
>> Cumprimentos,
>>
>> *Pedro Lima Monteiro*
>>
>> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi Pedro!
>>>
>>> This is definitely possible, by simply writing a Flink `SourceFunction`
>>> that uses MongoDB clients to fetch the data.
>>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>>
>>> Could you explain a bit which part in particular you were stuck with?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>>> pedro.mlmonteiro@gmail.com) wrote:
>>>
>>> Good morning,
>>>
>>> I am trying to get data from MongoDB to be analysed in Flink.
>>> I would like to know if it is possible to stream data from MongoDB into
>>> Flink. I have looked into Flink's source function to add in the addSource
>>> method of the StreamExecutionEnvironment but I had no luck.
>>> Can anyone help me out?
>>> Thanks.
>>>
>>> *Pedro Lima Monteiro*
>>>
>>>
>>
>

Re: Streaming data from MongoDB using Flink

Posted by Pedro Monteiro <pe...@gmail.com>.
Thank you again for your prompt response.

I will give it a try and will come back to you.

*Pedro Lima Monteiro*

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> I would recommend checking out the Flink RabbitMQ Source for examples:
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-rabbitmq/src/main/java/org/
> apache/flink/streaming/connectors/rabbitmq/RMQSource.java
>
> For your case, you should extend the `RichSourceFunction` which provides
> additional access to override the `open()` life cycle method.
> In that method, you instantiate your MongoDB client connection and  fetch
> the cursor. In the `run()` method, you should essentially have a while loop
> that polls the MongoDB cursor and emits the fetched documents using the
> `SourceContext`.
>
> If your also looking to implement a MongoDB source that works with Flink’s
> checkpointing for exactly-once, be sure to check out:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/state.html#stateful-source-functions
>
> Cheers,
> Gordon
>
> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
> pedro.mlmonteiro@gmail.com) wrote:
>
> Dear Tzu-Li,
>
> Thank you so much for your prompt response.
>
> Lets assume I have a variable, in Java, env which is my
> StreamExecutionEnvironment. When I go ahead and attempt to do:
>
>> ​env.addSource();
>>
>
> ​It requests an implementation of a Source Function interface:
> ​
>
>> env.addSource(new SourceFunction<Document>() {
>
>
>> ​​
>> @Override
>
>             public void run(SourceFunction.SourceContext<Document> ctx)
>> throws Exception {
>
>
>> ​// TO DO​
>>
>             }
>
>
>>             @Override
>
>             public void cancel() {
>
>
>> ​// TO DO​
>>
>             }
>
>         });
>
> ​And this is where I'm somehow stuck. I do not understand how should I
> access my MongoDB's cursor in any of this methods (I suppose the most
> adequate would be the "run" method) in a way it would allow me to return a
> new MongoDB document as it arrived to the database from another source.
>
> Once again, thank you so much for your help.
>
> I will wait to hear from you!​
>
> Cumprimentos,
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Pedro!
>>
>> This is definitely possible, by simply writing a Flink `SourceFunction`
>> that uses MongoDB clients to fetch the data.
>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>
>> Could you explain a bit which part in particular you were stuck with?
>>
>> Cheers,
>> Gordon
>>
>>
>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>> pedro.mlmonteiro@gmail.com) wrote:
>>
>> Good morning,
>>
>> I am trying to get data from MongoDB to be analysed in Flink.
>> I would like to know if it is possible to stream data from MongoDB into
>> Flink. I have looked into Flink's source function to add in the addSource
>> method of the StreamExecutionEnvironment but I had no luck.
>> Can anyone help me out?
>> Thanks.
>>
>> *Pedro Lima Monteiro*
>>
>>
>

Re: Streaming data from MongoDB using Flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction<Document>() {
            ​​ @Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro


Re: Streaming data from MongoDB using Flink

Posted by Pedro Monteiro <pe...@gmail.com>.
Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my
StreamExecutionEnvironment. When I go ahead and attempt to do:

> ​env.addSource();
>

​It requests an implementation of a Source Function interface:
​

> env.addSource(new SourceFunction<Document>() {


> ​​
> @Override

            public void run(SourceFunction.SourceContext<Document> ctx)
> throws Exception {


> ​// TO DO​
>
            }


>             @Override

            public void cancel() {


> ​// TO DO​
>
            }

        });

​And this is where I'm somehow stuck. I do not understand how should I
access my MongoDB's cursor in any of this methods (I suppose the most
adequate would be the "run" method) in a way it would allow me to return a
new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

*Pedro Lima Monteiro*

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Pedro!
>
> This is definitely possible, by simply writing a Flink `SourceFunction`
> that uses MongoDB clients to fetch the data.
> It should be straightforward and works well with MongoDB’s cursor APIs.
>
> Could you explain a bit which part in particular you were stuck with?
>
> Cheers,
> Gordon
>
>
> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
> pedro.mlmonteiro@gmail.com) wrote:
>
> Good morning,
>
> I am trying to get data from MongoDB to be analysed in Flink.
> I would like to know if it is possible to stream data from MongoDB into
> Flink. I have looked into Flink's source function to add in the addSource
> method of the StreamExecutionEnvironment but I had no luck.
> Can anyone help me out?
> Thanks.
>
> *Pedro Lima Monteiro*
>
>

Re: Streaming data from MongoDB using Flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro